...
Fuse uses the scatter-gather pattern in creating weekly vehicle reports. We've already seen the process_periodic_report_with_rcn
rule in the Event the Event Recipient Lists patternLists pattern. This rule scatters events telling the vehicles that the fleet needs the vehicle report. The rules we just saw in the Aggregator pattern the Aggregator pattern are the gathering part of this set of rulesruleset.
When we combine the pictures from those two patterns, we get a set up that looks like this:
...
The owner pico kicks everything off by sending the request_periodic_report
event to the fleet. The start_periodic_report
rule in the fleet pico scatters the periodic_vehicle_report
event to each vehicle in the fleet, whether there's 1 or 1001000. Of course, these events are asynchronous as well. Consequently the vehicle picos are not under time pressure to complete.
...
In the case of Fuse, the start_periodic_report
rule that we saw in the section the section on Correlating Events also Events also schedules the periodic_report_timer_expired
event for two minutes in the future:
...
Code Block |
---|
rule retry_from_expired_timer { select when explicit periodic_report_timer_expired pre { max_retries = 2; rcn = event:attr("report_correlation_number"); tz = event:attr("timezone"); vehicle_summaries = vehicleSummary(); vehicle_reports = ent:vehicle_reports{[rcn,"reports"]} .defaultsTo([]); vehicle_summaries_keys = vehicle_summaries.map(function(r){r{"deviceId"}}); vehicle_reports_keys = vehicle_reports.map(function(r){r{"deviceId"}}); missing_vehicles = vehicle_summaries_keys .difference(vehicle_reports_keys); in_array = function(k,a){ a.filter(function(x){x eq k}).length() > 0; }; needed = vehicle_summaries.filter(function(s){in_array(s{"deviceId"}, missing_vehicles)}); rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull(); } if ( needed.length() > 0 && ent:retry_count < max_retries ) then { noop(); } fired { log info "Retrying for " + (needed.length()) + " vehicles"; ent:retry_count := ent_retry_count+1; raise fuse event "periodic_report_start" attributes { "vehicle_summaries": needed, "timezone": tz, "report_correlation_number": rcn } if rcn_unprocessed; schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz} } else { clear ent:retry_count; raise explicit event "periodic_report_ready" with report_correlation_number = rcn if rcn_unprocessed; } } |
...