Managing Subscriptions

Introduction to Subscriptions

To understand how subscriptions between picos work it is important to understand what a pico channel is and what channel policies are. Inter-pico Subscriptions use pico channels to implement the subscription protocol itself (which is independent of picos).

Subscriptions allow two picos to communicate with each other, enabling the creation of peer-to-peer structures.  From each pico's perspective there is a receiving channel, Rx, and a transmitting channel, Tx. Each pico receives events and queries from the other pico through the Rx channel and makes queries and sends events to the other pico using the Tx channel. However, it is important to note that there are only two pico channels, just a different perspective. For example, Pico A records a Tx channel and an Rx channel, but the Tx channel that pico A has recorded is actually Pico B's Rx channel and vice versa.


Subscriptions can occur between any two picos as long as the picos have a way to communicate (over HTTP or on the same engine). For example, in the Forever application, the picos belonged to different people and represented their personal contact information. The io.picolabs.subscription ruleset is what implements subscriptions, so both picos need to have it installed.

Subscriptions are established by pico A sending a request to establish a subscription to pico B, and pico B then accepting that request.

Well Known Rx

When the io.picolabs.subscription ruleset is installed on a pico (it is preinstalled on every pico created by Wrangler), it creates a pico channel named the "wellKnown_Rx" channel. This channel is what receives requests to create  subscriptions. The channel has a policy that only allows events related to accepting a subscription. This allows the ECI of this "wellKnown" channel to be shared publicly with anyone without fear that someone will send events that mess with the pico or send queries for information they are not allowed to have. Although any channel which does not block the subscription events will work when creating subscriptions, it is considered good practice to use the wellKnown_Rx channel unless the subscription is between a parent and child pico.

As a reminder, a channel contains three pieces of information: the Event Channel Identifier (ECI), tags, and policies. The well known channel tags are wellKnown_Rx with Tx_RxThis channel is used to establish new subscriptions between picos, and is meant to be publicly shared with potential subscribers.

A wellKnown_Rx can be accessed by querying the subscriptions wellKnown_Rx() function.

Creating a Subscription


A wrangler:subscription event is what initiates a subscription request. You raise it within the pico that is going to send the request to the other pico. Inside of the event you give attributes that describe the type of subscription you want to create. Any attributes added to this event will be seen by the pico receiving the request, which allow picos that receive requests to receive meta information about the requesting pico.

Requesting to Subscribe

Requesting to Subscribe
raise wrangler event "subscription" attributes {
  "name":"self_subscription", 						// The name of the subscription you wish to create (Optional)
  "wellKnown_Tx":"NWStbmANrMp9B6ucmHCWtv"			// This value will be the wellKnown_Rx of the pico you wish to send the request to. (Only required attribute)
}

As soon as the above event is raised, Wrangler will initiate the process. If all goes well, the receiving pico will have a wrangler:inbound_pending_subscription_added event raised within it.


Accepting a Subscribe Request

To accept (or decline) a subscription request, the receiving pico should listen for the wrangler:inbound_pending_subscription_added event. This event will provide meta information about the type of subscription the other pico wants to form, as well as an attributes that were passed to the wrangler:subscription event that initiated the subscription. 

To accept the subscription, the client only need raise the wrangler:pending_subscription_approval event in response to the wrangler:inbound_pending_subscription_added event, with either the "Id" of the soon-to-be subscription or the ECI of the Rx that this pico will receive subscription messages on. Both of these things are already attributes from the wrangler:pending_subscription_approval event, so a simple raise wrangler event "pending_subscription_approval" attributes event:attrs; works just fine.

Accepting a Request
/*This rule auto accepts any subscription request that this pico receives*/
rule autoAcceptSubscriptions {
  select when wrangler inbound_pending_subscription_added
  always {
    raise wrangler event "pending_subscription_approval" attributes event:attrs; // Simplified and idiomatic subscription acceptance
  }
}

/*This rule checks if the subscription request is what this pico is looking for before accepting*/
rule acceptSensorSubscriptions {
  select when wrangler inbound_pending_subscription_added
  pre {
    subscriptionHasMyAttr = event:attr("temp_sensor_ID") 						 // Does the request have a temp_sensor_ID attribute?
  }
  if subscriptionHasMyAttr then													 // If it does, then fire the rule
  noop()
  fired {
    raise wrangler event "pending_subscription_approval" attributes event:attrs; // Simplified and idiomatic subscription acceptance
  }
}

/*
Also valid ways of accepting a subscription
*/
raise wrangler event "pending_subscription_approval" attributes {
  "Id":event:attr("Id")
};

raise wrangler event "pending_subscription_approval" attributes {
  "Rx":event:attr("Rx")
};

Reacting to Subscription Creation or a Cancelled Request

As soon as a subscription is created, a KRL developer may want to save that subscriptions info so that they can start utilizing the subscription for communication, or immediately send a message on the subscription. The developer may react to the wrangler:subscription_added event for this purpose. The event contains attributes that provide the new subscription's attributes and any attributes that were passed through the subscription creation event chain.

Reacting to New Subscription
/* Our rule that reacts to a new subscription being added and records it in an entity variable */
rule newSubAdded {
  select when wrangler subscription_added
  pre {
    subID = event:attr("Id")                // The ID of the subscription is given as an attribute
    subInfo = event:attr("bus")             // The relevant subscription info is given in the "bus" attribute
  }
  always {
    ent:subs{subID} := subInfo              // Record the sub info in this ruleset so we can use it
  }
}

/* This rule sends an event to each subscription we know about when a new subscription is added */
rule sendMsgToSubs {
  select when wrangler subscription_added
  foreach ent:subs.values() setting (sub)
  event:send({
    "eci":sub{"Tx"},                        // The "Tx" in the subscription info is the other pico's receiving channel (Rx), which takes our events and queries!
    "eid":"sending_info",                   
    "domain":"example",                     // The domain of our event (needs to be allowed by subscription policy)
    "type":"example_message",               // The event type of our event
    "attrs":{                               // The attrs of our event
      "message":"Hello subscribers!"
    }
  })
}

Advanced Subscription Creation Example

This is an example that is closer to what safe, idiomatic code might look like for creating a relationship between picos using subscriptions. It utilizes different subscription attributes to identify and manipulate the new subscription. Assume the ruleset that these rules are written is installed on both picos.

Advanced Example
/* 
  Given this rule to start the subscription
  Initiate the subscription with a peer
*/
rule connectToPeer {
  select when peers connect_to_peer
  pre {
    peerLoc = event:attr("peerLoc")
  }
  always {
    raise wrangler event "subscription" attributes {
      "name":"info_peer_sub",
      "channel_type":meta:rid,        // use our rid as a channel type domain
      "wellKnown_Tx": peerLoc,
      "peer_id":ent:peerId,           // Use this pico's peer ID
      "Rx_role":"node",
      "Tx_role":"node"
    }
  }
}

rule autoAcceptSubscriptions {
  select when wrangler inbound_pending_subscription_added
  if event:attr("name") == "info_peer_sub" && event:attr("channel_type") == meta:rid then // If name matches our unique rid and is an info_peer_sub relationship
  noop()
  fired {
    raise wrangler event "pending_subscription_approval" attributes event:attrs.put("other_peer_id", ent:peerId); // Approve
  }
}
/*
  Once the subscription is accepted subscription_added is raised in both picos
*/
rule newPeerAdded {
  select when wrangler subscription_added
  pre {
    peerId = event:attr("peer_id") == ent:peerId => event:attr("other_peer_id") | event:attr("peer_id") // Get the peer ID we want
    isPeer = peerId                                     // If this is a peer 
             && event:attr("Rx_role") == "node"         // And we're both nodes
             && event:attr("Tx_role") == "node"
    subID = event:attr("Id")                            // Record the ID of the subscription
    subInfo = event:attr("bus").put("peer_id", peerId)  // Store the subscription information with the peer_id in a binding.
  }
  if isPeer then
  noop()
  fired {
    ent:peers{subId} := subInfo.put("peer_id", peerId)  // Store the peer subscription info in an internal ruleset map so we can interact with it
  }
}

Deleting a Subscription


To delete or cancel a subscription the KRL developer raises a wrangler:subscription_cancellation event with attributes that describe the target subscription to cancel. As a result, a wrangler:subscription_removed event will be raised that the developer can react to.

Starting Subscription Cancellation
rule cancelSub {
  select when example cancelSub
  always {
    raise wrangler event "subscription_cancellation" attributes {
      "Id":"cjy0tf9xu009mtqbzdx6wcelr" // The ID of the subscription can be used
    }
  }
}
Reacting to a Subscription Cancellation
/*Turns a pico white in the UI when a subscription has been removed*/
rule whenSubCancelled {
  select when wrangler subscription_removed
  always {
    raise engine_ui event "box" attributes {
    "backgroundColor":"#FFFFFF"
    }
  }
}

Sending Messages Between Subscribers


Manually Sending Messages

At any time the KRL developer can get an array of all established subscriptions by querying the established() function in the io.picolabs.subscription ruleset. Each member of the array will have a map giving all needed information about the subscription (descriptions of these attributes are given in the Subscription Attributes section). The "Tx" attribute in this map is the ECI of the other pico's channel for receiving events and queries. The developer can use this value with event:send() to send events valid for the channel to the pico, or send queries using skyQuery/HTTP.

However, using the established() function is not necessary. A developer could manage subscriptions that only they care about by saving the subscription information when a new subscription is created by reacting to the wrangler:subscription_added event.

Attempt to Send an Event to Every Subscriber
/* In the meta block: "use module io.picolabs.subscription alias subscription" */
/* This rule attempts to send an event to each subscription this pico has */
rule sendMsgToSubs {
  select when example send_msg
  foreach subscription:established() setting (sub)
  event:send({
    "eci":sub{"Tx"},                        // The "Tx" in the subscription info is the other pico's receiving channel (Rx), which takes our events and queries!
    "eid":"sending_info",                   
    "domain":"example",                     // The domain of our event (needs to be allowed by subscription policy)
    "type":"example_message",               // The event type of our event
    "attrs":{                               // The attrs of our event
      "message":"Hello subscribers!"
    }
  })
}
Send a Query to a Subscriber given Subscription ID
/* In the meta block: "use module io.picolabs.subscription alias subscription" */
/* Given a valid Sub ID get the name of the other pico */
rule queryPeer {
  select when update get_peer_info
  pre {
    subID = "cjy4yp73q00940sbz9ocy7h5c"                                                           // Given a valid Sub ID
    peerSubs = subscription:established("Id", subID)                                              // Get established subscriptions that match that ID
    sub = peerSubs.head()                                                                         // Returned an array so we get the first sub
    peerChannel = sub{"Tx"}                                                                       // Get the "Tx" channel to send our query to
    peerHost = (sub{"Tx_host"} || meta:host)                                                      // See if the pico is on a different host. If it isn't use this engine's host
    peersInfo = wrangler:skyQuery(peerChannel, "io.picolabs.wrangler", "myself", null, peerHost)  // Actually query the pico with the information we've found
    peersPicoName = peersInfo{"name"}.klog("pico name is")                                        // Use the returned query info to get the info we want
  }
}

Using an API Call

not yet tested

The rule that reacts to a wrangler:send_event_on_subs event has not yet been tested in pico-engine version 1.0.0


Subscriptions also provides an API event that the KRL developer can raise to send an event on a subscription. The developer can choose to either send an event to a specific subscription ID, to all subscriptions with a certain Tx_role, or all subscriptions with a certain Rx_role, or a mixture of the three.

This rule sends an event to a specific subscription ID:

Send an Event to a specific Sub ID
/* Given a valid Sub ID tell the other pico to create a child, if the policy allows it */
rule sendEventToSubID {
  select when example send_msg
  pre {
    subID = "cjyf4v8hd00vuokbz0oz2eb1z"
  }
  always {
    raise wrangler event "send_event_on_subs" attributes {
      "domain":"wrangler",
      "type":"child_creation",
      "subID":subID,
      "attrs":{
        "name":"CreatedBySubscription"
      }
    }
  }
}

This rule sends an event to all subscriptions that record this pico as a "node" by the Rx_role:

Send an Event to Subs with a specific Rx_role
/* Send a wrangler:child_creation event to all subscriptions where this pico is a node */
rule send_to_all_nodes {
  select when nodes send_event_to_nodes
  always {
    raise wrangler event "send_event_on_subs" attributes {
      "domain":"wrangler",
      "type":"new_child_request",
      "Rx_role":"node",
      "attrs":{
        "name":"CreatedBySubscription"
    }
  }
}

Subscription Attributes




Subscriptions come with a few related attributes that allow for different ways of using subscriptions. Italicized attributes may be NULL.

Id

The ID of a subscription identifies a subscription relationship. By default it is a globally unique identifier assigned by wrangler, but it can be set by the developer in the initial wrangler:subscription event. Both picos record the same ID for the subscription, as the ID represents the relationship itself.

Tx

The Tx attribute is the ECI of the other pico's Rx. It is the pico channel that this pico will send events and queries to.

Rx

The Rx attribute is the ECI that this pico receives queries and events from the other pico on. It is recorded as the Tx attribute on the other pico. 

Tx_role

When creating a subscription you can store "roles" for each pico. This makes it easy to identify subscriptions with special types of relationships, such as a master-slave relationship between two picos. The Tx_role attribute is the role that the other pico has in the subscription relationship. Hence "Transmitting channel role".

Rx_role

When creating a subscription you can store "roles" for each pico. This makes it easy to identify subscriptions with special types of relationships, such as a master-slave relationship between two picos. The Rx_role attribute is the role that this pico has in the subscription relationship. Hence "Receiving channel role".

Tx_host

The Tx_host attribute is the network location of the engine the other pico is running on. This allows cross-engine subscriptions. If this attribute is null then the other pico is on the same engine. When manually sending messages the developer should check if this is null and use its value to send the message if it is not null. The event:send action does this check for you.

Types of Subscriptions


Directional Subscription

A directional subscription is a subscription that stores roles, providing an easier way to identify relationships (such as master-slave) in a subscription.

To create a directional subscription request from pico A to pico B. Raise a wrangler:subscription event to pico A with additional attributes, Rx_role, Tx_role.  Rx_role is pico A's role in the subscription, leaving Tx_role to be pico B's role in the subscription relationship.

Cross Engine Subscription

It is easy to imagine a situation where you want to create a subscription between picos hosted on different engines. Cross engine subscriptions allow us to do so by storing the host of the other pico in a subscription relationship.

To create a cross engine subscription between pico A and pico B, raise a wrangler:subscription event to pico A with event attribute Tx_host.  Tx_host is pico B's engine host url including protocol, for example, "http://localhost:8282".

Understanding the Protocol


Subscription Event Flow

Internal API

This section is solely for illustrating how the protocol works under-the-hood. The internal events used are not static and are subject to change, and should not be selected on for long-term code.

Subscription Creation

To establish a basic subscription between pico A and pico B:

1. Pico A receives a wrangler:subscription event which includes a wellKnown_Tx attribute. wellKnown_Tx is Pico B's wellKnown_Rx .  Pico A then creates a channel for the subscription which is stored as Rx, then raises wrangler:subscription_request_needed.

2. Pico A sends pico B wrangler:new_subscription_requestPico A sends all attributes it received in wrangler:subscription with the following key attributes updated:

a. Tx - Pico A Rx.

b. name - name used for channel.

e. channel_type - string used for channel type

3. Pico B creates a Rx channel for the subscription which is stored as Rx along with TxTx_verify_keyTx_public_key in the ent:inbound subscription entity variable.

4. After storing pending subscriptions, Pico B and Pico A raises corresponding wrangler:inbound_pending_subscription_added event and wrangler:outbound_pending_subscription_added event.
The state of each Picos subscription is static at this point. Pico A has an outgoing pending subscription, with relevant info stored in the ent:outbound subscription entity variable, and Pico B has an incoming pending subscription with relevant info stored in the ent:inbound subscription entity variable.

5. If Pico B wants to approve the subscription request, it raises the wrangler:pending_subscription_approval event with an attribute Rx containing the Rx channel identifier of the subscription to approve.  Upon successfully storing the subscription in ent:established entity variable, pico B:

a. sends the wrangler:outbound_pending_subscription_approved event to Pico A on Pico B's subscriptions Tx.

b. raises an API event wrangler:subscription_added.

6. Upon receiving the wrangler:outbound_pending_subscription_approved event, Pico A adds an Tx eci to the subscription and stores the subscription in ent:established entity variable.  And raises the wrangler:subscription_added event.


If Pico B wants to reject the subscription, it raises the wrangler:inbound_rejection event with an attribute named Rx - which is the Rx of the subscription to reject. Pico B:

a. sends the wrangler:outbound_removal event to Pico A on Picos B Tx ECI, which then raises an API wrangler:outbound_subscription_cancelled event to itself.

b. raises the wrangler:inbound_removal event, which then raises an API wrangler:inbound_subscription_cancelled event.

If Pico A wants to revoke the subscription request, it raises the wrangler:outbound_cancellation event with an attribute named Rx - which is the Rx of the subscription to reject. Pico A:

a. sends the wrangler:inbound_removal event to Pico B on its wellKnown_Rx ECI, which then raises an API wrangler:inbound_subscription_cancelled event to itself.

b. raises the wrangler:outbound_removal event, which then raises an API wrangler:outbound_subscription_cancelled event.

Subscription Deletion

  1. Either pico A or pico B raises a wrangler:subscription_cancellation event. For illustration purposes we will consider it as pico A raising the event, but the process is mirrored either way.
  2. Pico A raises a wrangler:established_removal event within itself and sends another to pico B over the Tx channel.
  3. On reaction to established_removal the channels are deleted and the subscription removed from the list of established subscriptions. wrangler:subscription_removed is then raised with relevant information.