Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
rule start_periodic_report {
  select when fuse periodic_report_start
  pre {
    new_rcn = genCorrelationNumber();
    rcn = event:attr("report_correlation_number")
	       .defaultsTo(new_rcn);
    ...
    report_data = {"period": period,
		   "start": start,
		   "end": end,
		   "timezone": tz};
    augmented_attrs = event:attrs()
                       .put(["report_correlation_number"], rcn);
  }
  fired {
    raise explicit event "periodic_report_routable"
      with attributes augmented_attrs;

   set ent:report_data{rcn} := report_data;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  }
}

...

Code Block
rule route_to_owner {
  select when fuse new_fleet
           or fuse reminders_ready
           or fuse email_for_owner
  pre {
    owner_subs =
       CloudOS:subscriptionList(common:namespace(),wrangler:established().filter(function(sub){sub{Tx_role} == "FleetOwner"});
    // find the owner who contacted us (could be more than one)
    matching_owner = owner_subs.filter(
              function(sub){ sub{"backChannel"} eq meta:eci()
                           }
              );
    // use first owner if no match
    owner_list = matching_owner.length() > 0 => matching_owner
                                              | owner_subs;
    owner = owner_list.head().pick("$.eventChannel");
  }
  {
    send_directive("Routing to owner")
      with channel = owner 
       and attrs = event:attrs();
    event:send({"cid": owner}, "fuse", event:type())
      with attrs = event:attrs();
  }
}

...

Code Block
rule register_picos {
  select when system pico_registration
  pre {
    topic = event:attr("topic");
    pico_data =  makeRegistrationRecord(event:attrs());
  }
  if(not pico_data{"eci"}.isnull()) then noop();
  fired {
     set ent:registrations{topic} := pico_data
  }
}

Commentary:

...

Code Block
rule catch_periodic_vehicle_reports {
  select when fuse periodic_vehicle_report_created

  pre {
    vehicle_id = event:attr("vehicle_id");
    rcn = event:attr("report_correlation_number");
    updated_vehicle_reports =
        (ent:vehicle_reports{[rcn,"reports"]})
		  .defaultsTo([])
	          .append(event:attr("vehicle_details").decode());

  }
  noop();
  always {
    set ent:vehicle_reports{[rcn,"reports"]} := updated_vehicle_reports;
    raise explicit event periodic_vehicle_report_added with
      report_correlation_number = rcn
  }

}    

...

Code Block
rule check_periodic_report_status {
  select when explicit periodic_vehicle_report_added
  pre {
    rcn = event:attr("report_correlation_number");
    vehicles_in_fleet = activeVehicleSummary().length();
    number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]})
				   .length();
  }
  if ( vehicles_in_fleet <= number_of_reports_received ) then noop();
  fired {
    log info "process vehicle reports ";
    raise explicit event "periodic_report_ready" with
      report_correlation_number = rcn;
  } else {
    log info "we're still waiting for " +
        (vehicles_in_fleet - number_of_reports_received) +
	" reports on #{rcn}";
  }
}

...

Code Block
rule retry_from_expired_timer {
  select when explicit periodic_report_timer_expired
  pre {
    max_retries = 2;
    rcn = event:attr("report_correlation_number");
    tz = event:attr("timezone");
    vehicle_summaries = vehicleSummary();
    vehicle_reports = ent:vehicle_reports{[rcn,"reports"]}
                        .defaultsTo([]);
    vehicle_summaries_keys = vehicle_summaries
                               .map(function(r){r{"deviceId"}});
    vehicle_reports_keys = vehicle_reports
                             ..map(function(r){r{"deviceId"}});

    missing_vehicles = vehicle_summaries_keys
			   .difference(vehicle_reports_keys);
    in_array = function(k,a){
      a.filter(function(x){x eq k}).length() > 0;
    };
    needed = vehicle_summaries.filter(
                 function(s){
		   in_array(s{"deviceId"}, missing_vehicles)
		 });

    rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull();
  }
  if ( needed.length() > 0
    && ent:retry_count < max_retries
     ) then {
    noop();
  }
  fired {
    log info "Retrying for " + (needed.length()) + " vehicles";
    set ent:retry_count := ent_retry_count+1;
    raise fuse event periodic_report_start attributes {
       "vehicle_summaries": needed,
       "timezone": tz,
       "report_correlation_number": rcn 
      } if rcn_unprocessed;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  } else {
    clear ent:retry_count;
    raise explicit event "periodic_report_ready" with
      report_correlation_number = rcn if rcn_unprocessed;
  }
}

...

The concurrent processing is done without locks of any kind. Because each pico is independent, they have no need of locks when operating concurrently. The fleet pico could receive events from multiple vehicles, but they are queued and handled in turn. Consequently, we don't need locks inside the pico either. Lockless concurrency is a property of Actor-model systems like picos.In general, I'm pretty happy with how this works and it was fun to think about. Next time I'm faced with a similar problem, scatter-gather will be my first choice, not the one I use after the synchronous solution fails.

...

Notes

  1. I recommend Vaughn's book for anyone interested in picos. While the language/framework (Scala and Akka) is different, the concepts are all very similar. There's a lot of good information that can be directly applied to programming picos.

...