/
Converting pull to push using Picos

Converting pull to push using Picos

There are thousands of web sites which have an API, so that a page in a web browser can pull data from them. There are fewer which will accept a web hook and push data to a web server.

This project is about using a pico to periodically pull data from the first kind of web site and then push it out to other picos. The consuming picos would generally be on other engines.

Architecture

The periodic pulling will be done by one pico running in a pico engine. Each burst of data pulled from the source will then be sent to a pico on the same engine whose job it is to route the data to all consumer picos. The consumer picos can be running on the same engine or on engines elsewhere on the Internet. Consumer picos need not be identical, nor owned/controlled by the same people.

Here is a collage showing some of the picos in three different instances of the node pico engine, all as it happens running different versions. One engine is running a generator, router, and consumer. The other two are each running a consumer pico. When the generator produces a random verse and sends it to the router, the router will send it to each of the three consumers.

In addition to the rulesets used in the pulling pico, the routing pico, and the consumer picos, there will be a ruleset which will be used as a module to provide any secrets needed in connection with using the source API. We will also provide a ruleset (intended for use as a module) with data and functions peculiar to the data itself, which may be useful to consumers. 

This is closely related to the Modules and External APIs Lesson.

Specific example

The specific data source is an API which, upon each request, returns a random verse from the Book of Mormon. But the principles apply more generally.

All of the code for this project is contained in a GitHub repository.

Pulling data from the source

Manual creation of the main application pico

We manually (through the UI) create a pico, which we will name "bofm generator". Once it is created, we install the the ruleset bofm.main, from the repository, again using the UI, as shown here:

When a ruleset is installed in a pico, the event wrangler:ruleset_installed is sent to the pico (because the UI does this by sending it the event wrangler:new_ruleset, and that event raises the wrangler event "ruleset_installed" when the job is done).

Programmatic creation of the router pico

The bofm.main ruleset reacts to this event by selecting it in the initialize rule shown here in line 3. Line 4 selects when the event arrives, and ensures that the rule will be selected only if the ruleset identifiers (rids) of the ruleset which was installed includes (><) the RID of this ruleset (meta:rid). Line 6 binds the name bofmBase to the URL from which this ruleset was registered and installed, line 7 binds the name bofmURL to the relative URL of the ruleset which will be needed by the router pico, and the action in line 9 has the engine register the bofm.router ruleset (introduced below) with the engine.

ruleset bofm.main {
  ...
  rule initialize {
    select when wrangler ruleset_installed where rids >< meta:rid
    pre {
      bofmBase = meta:rulesetURI;
      bofmURL = "bofm.router.krl";
    }
    engine:registerRuleset(bofmURL,bofmBase);
    fired {
      raise wrangler event "new_child_request"
        attributes { "dname": "bofm router", "color": "#87cefa",
                     "rids": "bofm.router;io.picolabs.subscription" };
      raise wrangler event "install_rulesets_requested"
        attributes { "rids": "io.picolabs.subscription" };
    }
  }
  ...
}

Finally, this rule raises two events (to the same pico).

The first raised event is a new_child_request asking for a child pico named "bofm router" to be created, having two rulesets pre-installed. The first of these, with RID bofm.router is the one we ensured was registered in the engine, and the second is the system-supplied io.picolabs.subscription ruleset.

The second raised event will result in wrangler installing the subscription ruleset in this, the "bofm generator" pico.

Connecting the two picos with a subscription

When wrangler has finished creating the child pico, that pico will send the event wrangler:child_initialized to the generator pico. This rule will raise an event to programmatically set up a subscription between "bofm generator" (playing the role of "generator") and the newly created pico (playing the role of "router"). We'll see later how this subscription request gets approved. For more discussion on subscriptions, see the Pico to Pico Subscriptions Lesson.

ruleset bofm.main {
  ...
  rule new_router {
    select when wrangler child_initialized
    pre {
      child_eci = event:attr("eci");
    }
    fired {
      raise wrangler event "subscription" attributes
        { "name" : "to_router",
          "name_space" : "bofm",
          "my_role" : "generator",
          "subscriber_role" :"router",
          "channel_type" : "subscription",
          "subscriber_eci" : child_eci
        }
    }
  }
  ...
}

Operation of the main pico

The "bofm generator" pico will periodically pull a verse from the random source and foward it to the "bofm router" pico. But we may not want it to run continously, and we want to be able to shut it down when necessary.

Starting and stopping

The "bofm generator" pico expects two events, to turn it off and on. When a ruleset is initially installed in a pico, it has no entity variables. So, turning off the generator is as simple as clearing the entity variables it will be using. To turn it off, we must send it the event bofm:idle_request with no attributes. Line 4 selects the bofm_idle_request rule when this event arrives. Line 5 performs the action of removing a scheduled event if there is one. And if the rule fires, then it clears two entity variables (lines 7-8).

ruleset bofm.main {
  ...
  rule bofm_idle_request {
    select when bofm idle_request
    if ent:schedule_id then schedule:remove(ent:schedule_id);
    fired {
      clear ent:schedule_id;
      clear ent:minutes;
    }
  }
  ...
}

Turning the generator on is accomplished by sending our pico the event bofm:start_request with an attribute named minutes indicating how frequently we require it to pull data from the source. Line 4 selects the bofm_start_request rule but only when the minutes attribute is a sequence of digits with at least one digit. If that sequence of digits is a number no smaller than one, the rule will fire. When the rule fires, it will store away the number of minutes into an entity variable and raise the bofm event "verse_needed" which starts the process. Finally, the last statement (line 9) prevents evaluation of any other rules that might have been scheduled for this event.

The second rule, bofm_start_every_minute, will select on bofm:start_request and will be evaluated unless the bofm_start_request rule fired. It provides a default interval of one minute, and will fire if the event didn't have a minutes attribute, or if that attribute had no digits, or if it was a number less than one.

ruleset bofm.main {
  ...
  rule bofm_start_request {
    select when bofm start_request minutes re#(\d+)# setting(minutes)
    if minutes.as("Number") >= 1 then noop();
    fired {
      ent:minutes := minutes;
      raise bofm event "verse_needed";
      last;
    }
  }
  rule bofm_start_every_minute {
    select when bofm start_request
    fired {
      ent:minutes := 1;
      raise bofm event "verse_needed";
    }
  }
  ...
}

Regular periodic operation

The pico will react to the bofm:verse_needed event by requesting a random verse from the source. The source is wrapped by a KRL module with RID bofm.random, to which we will refer by the name bofm within the ruleset. The use of this module is declared in line 3. When the event bofm:verse_needed arrives, the rule bofm_verse_needed will be selected (by line 8). this rule will obtain a verse from the random source, binding it to the name v. It will then arrange for a schedule "bump" (line 13) and raise the bofm event "verse_to_route" (to itself) passing the verse as an attribute (line 14). The rule bofm_schedule_bump will thus be added to the schedule, selected by line 18, and it will schedule another "verse_needed" event to occur ent:minutes in the future (line 20). The ID of the scheduled event will be stored into ent:schedule_id by lines 21-22.

ruleset bofm.main {
  meta {
    use module bofm.random alias bofm
    ...
  }
  ...
  rule bofm_verse_needed {
    select when bofm verse_needed
    pre {
      v = bofm:verse();
    }
    fired {
      raise bofm event "schedule_bump";
      raise bofm event "verse_to_route" attributes v;
    }
  }
  rule bofm_schedule_bump {
    select when bofm schedule_bump
    fired {
      schedule bofm event "verse_needed" at time:add(time:now(), {"minutes": ent:minutes})
        setting(schedule_id);
      ent:schedule_id := schedule_id;
    }
  }
  ...
}

This establishes the periodic pulling of a random verse. A verse is a simple map. Although the bofm.random module is not in the repository, we can see the structure in this sample code depicting a very simple provider of random verses from a list of one (as opposed to the 6604 verses used by the actual random source).

ruleset bofm.random {
  meta {
    provides verse
  }
  global {
    verse = function() {
      { "1 Nephi 7:3": "I will go and do..." }
    }
  }
}

Routing a verse to consumers

Each time we pull a random verse, the event bofm:verse_to_route is raised, and it selects the rule bofm_verse_to_route (line 16). This rule sets up a loop over the router subscriptions (obtained by the function defined in lines 9-12). For each such subscription, line 19 binds the name eci to the event channel id (ECI) of the router pico. We send the event bofm:verse to each of these router picos, passing along the data obtained from the random source in the event attributes.

ruleset bofm.main {
  meta {
    ...
    use module io.picolabs.subscription alias subscription
    ...
  }
  global {
    ...
    my_routers = function() {
      subscription:getSubscriptions(["attributes","subscriber_role"],"router")
        .map(function(v){v.values().head()})
    }
  }
  ...
  rule bofm_verse_to_route {
    select when bofm verse_to_route
    foreach my_routers() setting(s)
    pre {
      eci = s{["attributes","outbound_eci"]};
    }
    event:send({"eci":eci, "domain": "bofm", "type": "verse", "attrs": event:attrs()});
  }
  ...
}

Operation of the routing pico

Initialization

All that is done when the ruleset is installed into the "bofm router" pico is to initialize the entity variable ent:consumers to be an empty map.

ruleset bofm.router {
  ...
  rule initialize {
    select when wrangler ruleset_installed where rids >< meta:rid
    fired {
      ent:consumers := {};
    }
  }
  ...
}

Accepting the subscription request from the main pico

Done in the standard way by the autoAccept rule.

ruleset bofm.router {
  ...
  rule autoAccept {
    select when wrangler inbound_pending_subscription_added
    pre{
      attributes = event:attrs();
    }
    always{
      raise wrangler event "pending_subscription_approval"
          attributes attributes;
    }
  }
  ...
}

Doing the actual routing

Whe the "bofm router" pico receives the event bofm:verse, the incoming_verse rule selected by line 4 evaluates. This rule sets up a loop over all of its consumers (line 5) and sends each one of them the event bofm:verse with the same attributes (line 6) possibly on a different pico engine (line 7).

ruleset bofm.router {
  ...
  rule incoming_verse {
    select when bofm verse
    foreach ent:consumers setting(host,eci)
    event:send({"eci":eci, "domain": "bofm", "type": "verse", "attrs": event:attrs()},
      host);
  }
}

Accepting a new consumer

The pico reacts to an event bofm:new_consumer which includes an attribute named consumer_eci by storing an entry in its ent:consumers map with the customer ECI as the key, and the value of the attribute host, if present, as the value.

ruleset bofm.router {
  ...
  rule bofm_new_consumer {
    select when bofm new_consumer consumer_eci re#(.+)# setting(consumer_eci)
    fired {
      ent:consumers{consumer_eci} := event:attr("host");
    }
  }
  ...
}

When a consumer goes away

The pico reacts to an event bofm:consumer_removed by clearing that entry from its map.

ruleset bofm.router {
  ...
  rule bofm_consumer_removed {
    select when bofm consumer_removed consumer_eci re#(.+)# setting(consumer_eci)
    fired {
      clear ent:consumers{consumer_eci};
    }
  }
  ...
}

Consuming the stream of events

Operation of consumer picos

A minimal ruleset for consumers would look like this.

ruleset bofm.consumer {
}

Notice that this ruleset does not react to the bofm:verse event (the only event, so far, sent to a consumer pico). This is perfect acceptable. Ignoring events is allowed.

In fact, once you designate a pico to function as a consumer, and register it with the generator, you could watch the bofm:verse events coming in on that pico's "Logging" tab. Until you have a ruleset installed which reacts to the event, your pico won't do anything with the events, but they do appear in the logs. Showing here the second of six such events (one per minute) to a consumer pico. Notice that the event is received and processing begins, but there are no salient rules to schedule, so nothing else happens and the events are ignored. While this entire system of picos was running, you could write and install a ruleset to react to the event. Once you installed it, its rules would begin to be evaluated. There is no need to stop and restart any pico engines to make this happen; you simply install your ruleset and it begins to take part in the action.

Storing verses for future use

A more realistic ruleset might, at least, store away the incoming verses, for some future use. This code shows one way of doing that. The rule initialize sets up a couple of maps. Notice the alternate event pico:ruleset_installed with a slightly different where clause (in line 4). This is for backwards compatibility, for pico engines of earlier versions. The rule bofm_verse selects when a bofm:verse event arrives, and it reacts by handling each event, setting the name txt to the value of the (single) attribute and ref to the name of the (single) attribute. Finally, it makes an entry in the ent:refs map with the current time as the key and the verse reference as the value. It also adds an entry to the ent:txts map with the verse reference as the key and the text of the verse as the value.

ruleset bofm.consumer {
  rule initialize {
    select when wrangler ruleset_installed where rids >< meta:rid
    fired {
      ent:refs := {};
      ent:txts := {};
    }
  }
  rule bofm_verse {
    select when bofm verse
    foreach event:attrs() setting(txt,ref)
    fired {
      ent:refs{time:now()} := ref;
      ent:txts{ref} := txt;
    }
  }
}

This starting ruleset, bofm.consumer, is avaiable in the GitHub repository.

Sample data

A small sample, and an excerpt at that, of what might be held by a consumer pico is shown here. Note that references are ordered by timestamp, while texts are ordered by lexical ordering of the reference.

ent:refs={"2017-12-06T21:02:57.199Z":"Alma 14:8",
          ...
          "2017-12-08T16:38:43.539Z":"Mosiah 4:10",
          ...
          "2017-12-08T17:17:51.484Z":"Helaman 3:16"}
ent:txts={...
          "Alma 14:8":"And they brought...",
          ...
          "Helaman 3:16":"And they have been...",
          ...
          "Mosiah 4:10":"And again, believe...",
          ...}

Possible exercises

Display saved verses in some way

A reader might try to put the data collected by the consumer pico into a simple web site, such as these two screenshots show. First a selection list of all of the verses stored, in a canonical order, such that when the viewer clicks the button,

the stored text is shown in some formatted way.

One possible solution to this exercise is in the repository. Notice that this solution makes use of a module whose RID is bofm.data which contains some data and functions (but no rules). It would need to be registered with the engine before it could be used (but does not need to be installed in any pico).

Send a text message

A reader might try to send an incoming verse as a text message to their smart phone under certain conditions (from a specific book, once an hour, at a specific time of day, etc.). The Modules and External APIs Lesson would provide enough information to accomplish this, using Twilio.

A quiz of some kind

A reader might try to display a random text from those recieved, and quiz for the reference, or some part of it. For example, "Which book contains this verse: <verse>" along with a drop-down list of all the books.

Build something similar for a different web data source

Choose one of the thousands of web APIs which provide information which changes regularly, such as weather data, or some such. Write a system of picos which pulls this data periodically and then pushes it out to web hooks or to other picos.

Adapt and harden the generator and routers

An alert reader may have noticed that the generator pico can have more than one router. However, a new consumer would always be assigned to the first of these. Adapt the system so that it handles several router picos and many many consumers.

Come up with your own ideas

A data module

The repository also contains a ruleset intended to be used as a module. It consists solely of data about the Book of Mormon and a small collection of useful functions. To be used, this ruleset must be registered in the engine running your consumer pico, but does not need to be installed therein. We'll look at it piecemeal here. In line 3, the ruleset commits to providing three functions, ref_parts, ref_cmpbook_chapters, described below, and an array of book_names as defined in lines 6-10.

ruleset bofm.data {
  meta {
    provides ref_parts, ref_cmp, book_names, book_chapters
  }
  global {
    book_names = [
      "1 Nephi", "2 Nephi", "Jacob", "Enos", "Jarom",
      "Omni", "Words of Mormon", "Mosiah", "Alma", "Helaman",
      "3 Nephi", "4 Nephi", "Mormon", "Ether", "Moroni"
    ]
  ...
  }
}

The sample consumer ruleset would refer to the module, and its provided array of book names, as shown here. You may choose your own alias rather than using bofm. You refer to things provided by the module you are using by mentioning the alias you choose, a colon, and the name of the thing from the module.

ruleset bofm.consumer {
  meta {
    use module bofm.data alias bofm
    ...
  }
  ... bofm:book_names ...
}

Two of the functions are useful in taking apart verse references and sorting them as they appear in the Book of Mormon. The first of these functions, ref_parts, takes in a verse reference string and returns an array consisting of the book name, the chapter number, and the verse number, as defined in line 7. You can find more information in Regular Expressions and the extract operator is defined in the String Operators page. Line 8 is a utility function, used by, but not shared by the module. Lines 9-17 define a provided function named ref_cmp which takes two verse reference strings (ref_1,and ref_2) and returns either -1, 0, or +1 according as the first verse reference precedes, is the same as, or follows the second verse reference. This function is suitable for use as the argument passed to the sort operator defined in the Array Operators page to sort an array of verse references.

ruleset bofm.data {
  meta {
    provides ref_parts, ref_cmp, book_names, book_chapters
  }
  global {
  ...
    ref_parts = function(ref) { ref.extract(re#(.+) (\d+):(\d+)#) }
    num_cmp = function(a,b) { a.as("Number") <=> b.as("Number") }
    ref_cmp = function(ref_1,ref_2) {
      r1 = ref_parts(ref_1);
      r2 = ref_parts(ref_2);
      verse_cmp = function() { num_cmp(r1[2],r2[2]) };
      chapter_cmp = function() {
        ans = num_cmp(r1[1],r2[1]); ans => ans | verse_cmp() };
      book_cmp = book_names.index(r1[0]) <=> book_names.index(r2[0]);
      book_cmp => book_cmp | chapter_cmp()
    }
  ...
  }
}

The final function in the bofm.data module is book_chapters which when given the name of a book (the head of the array returned by ref_parts) returns the number of chapters the book contains. Notice the internal use of an array chapters which is not provided to the outside world by this ruleset-as-module.

ruleset bofm.data {
  meta {
    provides ref_parts, ref_cmp, book_names, book_chapters
  }
  global {
  ...
    chapters = [ 22, 33, 7, 1, 1, 1, 1, 29, 63, 16, 30, 1, 9, 15, 10 ]
    book_chapters = function(book_name) {
      book_number = book_names.index(book_name);
      book_number >= 0 => chapters[book_number] | 0
    }
  }
}

This function might be used by a consumer ruleset in this manner. The operator head is one of the Array Operators.

ruleset bofm.consumer {
  meta {
    use module bofm.data alias bofm
    ...
  }
  ... how_many_chapters = bofm:book_chapters( bofm:ref_parts( my_verse_reference ).head() ) ...
}

Copyright Picolabs | Licensed under Creative Commons.