...
Code Block |
---|
rule start_periodic_report { select when fuse periodic_report_start pre { new_rcn = genCorrelationNumber(); rcn = event:attr("report_correlation_number") .defaultsTo(new_rcn); ... report_data = {"period": period, "start": start, "end": end, "timezone": tz}; augmented_attrs = event:attrs() .put(["report_correlation_number"], rcn); } fired { raise explicit event "periodic_report_routable" with attributes augmented_attrs; set ent:report_data{rcn} := report_data; schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz} } } |
...
Code Block |
---|
rule route_to_owner { select when fuse new_fleet or fuse reminders_ready or fuse email_for_owner pre { owner_subs = CloudOS:subscriptionList(common:namespace(),wrangler:established().filter(function(sub){sub{Tx_role} == "FleetOwner"}); // find the owner who contacted us (could be more than one) matching_owner = owner_subs.filter( function(sub){ sub{"backChannel"} eq meta:eci() } ); // use first owner if no match owner_list = matching_owner.length() > 0 => matching_owner | owner_subs; owner = owner_list.head().pick("$.eventChannel"); } { send_directive("Routing to owner") with channel = owner and attrs = event:attrs(); event:send({"cid": owner}, "fuse", event:type()) with attrs = event:attrs(); } } |
...
Code Block |
---|
rule register_picos { select when system pico_registration pre { topic = event:attr("topic"); pico_data = makeRegistrationRecord(event:attrs()); } if(not pico_data{"eci"}.isnull()) then noop(); fired { set ent:registrations{topic} := pico_data } } |
Commentary:
...
Code Block |
---|
rule catch_periodic_vehicle_reports { select when fuse periodic_vehicle_report_created pre { vehicle_id = event:attr("vehicle_id"); rcn = event:attr("report_correlation_number"); updated_vehicle_reports = (ent:vehicle_reports{[rcn,"reports"]}) .defaultsTo([]) .append(event:attr("vehicle_details").decode()); } noop(); always { set ent:vehicle_reports{[rcn,"reports"]} := updated_vehicle_reports; raise explicit event periodic_vehicle_report_added with report_correlation_number = rcn } } |
...
Code Block |
---|
rule check_periodic_report_status { select when explicit periodic_vehicle_report_added pre { rcn = event:attr("report_correlation_number"); vehicles_in_fleet = activeVehicleSummary().length(); number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]}) .length(); } if ( vehicles_in_fleet <= number_of_reports_received ) then noop(); fired { log info "process vehicle reports "; raise explicit event "periodic_report_ready" with report_correlation_number = rcn; } else { log info "we're still waiting for " + (vehicles_in_fleet - number_of_reports_received) + " reports on #{rcn}"; } } |
...
Code Block |
---|
rule retry_from_expired_timer { select when explicit periodic_report_timer_expired pre { max_retries = 2; rcn = event:attr("report_correlation_number"); tz = event:attr("timezone"); vehicle_summaries = vehicleSummary(); vehicle_reports = ent:vehicle_reports{[rcn,"reports"]} .defaultsTo([]); vehicle_summaries_keys = vehicle_summaries .map(function(r){r{"deviceId"}}); vehicle_reports_keys = vehicle_reports ..map(function(r){r{"deviceId"}}); missing_vehicles = vehicle_summaries_keys .difference(vehicle_reports_keys); in_array = function(k,a){ a.filter(function(x){x eq k}).length() > 0; }; needed = vehicle_summaries.filter( function(s){ in_array(s{"deviceId"}, missing_vehicles) }); rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull(); } if ( needed.length() > 0 && ent:retry_count < max_retries ) then { noop(); } fired { log info "Retrying for " + (needed.length()) + " vehicles"; set ent:retry_count := ent_retry_count+1; raise fuse event periodic_report_start attributes { "vehicle_summaries": needed, "timezone": tz, "report_correlation_number": rcn } if rcn_unprocessed; schedule explicit event "periodic_report_timer_expired" at time:add(time:now(),{"minutes" : 2}) attributes {"report_correlation_number": rcn, "timezone": tz} } else { clear ent:retry_count; raise explicit event "periodic_report_ready" with report_correlation_number = rcn if rcn_unprocessed; } } |
...
The concurrent processing is done without locks of any kind. Because each pico is independent, they have no need of locks when operating concurrently. The fleet pico could receive events from multiple vehicles, but they are queued and handled in turn. Consequently, we don't need locks inside the pico either. Lockless concurrency is a property of Actor-model systems like picos.In general, I'm pretty happy with how this works and it was fun to think about. Next time I'm faced with a similar problem, scatter-gather will be my first choice, not the one I use after the synchronous solution fails.
...
Notes
I recommend Vaughn's book for anyone interested in picos. While the language/framework (Scala and Akka) is different, the concepts are all very similar. There's a lot of good information that can be directly applied to programming picos.
...