Introducing R1FS. Unleashing Decentralized Data-Sharing.

Education

Unleashing Decentralized Data-Sharing R1FS IPFS
Unleashing Decentralized Data-Sharing R1FS IPFS

What if you could seamlessly store, discover, and retrieve data in a fully decentralized manner - without complex configurations or third-party dependencies? Meet R1fsDemoPlugin, a compact yet powerful demonstration of how Ratio1’s private IPFS-based file system (R1FS) and ChainStore (aka CSTORE) collaborate to make peer-to-peer data sharing a breeze.

WARNING: Geeky-nerdy content inside 🤯

TL;DR: we have a few workers (participants) that have to “allow” each other otherwise they will not have access to the shared in-memory decentralized database (aka ChainStore). Each time a node creates a R1FS in our demo it will announce it via a hash set (in memory and decentralized) so that the other nodes are aware that someone shared something (R1FS creates the file, generates the content unique identifier but this CID has to arrive in other workers apps via CSTORE)

Then repeat 😄

Why does this matter?

So, here are some concrete applications of our Ratio1 R1FS:

Decentralized federated training for custom or foundation models

Multiple nodes can securely exchange partial model weights or aggregated learning statistics via R1FS, ensuring each node’s raw data remains private. This opens doors for collaborative machine learning projects across different organizations or domains without compromising individual data sovereignty.

Data analytics with distributed federated processing

Rather than consolidating massive datasets in a single location, each node can store and process its local data partition, advertising the results or intermediate analytics through ChainStorage. This approach drastically reduces data transfer overhead and allows real-time, large-scale computations across a network of peers.

Community-driven EDS: Allow multiple researchers or analysts to concurrently publish their local findings (e.g., aggregated summaries or visualizations) to R1FS. Everyone benefits from insights without needing direct, bulk data transfers, boosting collaborative insights in crowdsourced analytics.

Multi-institution collaboration: Different labs or companies (even cross-disciplinary) can securely share configurations, data subsets, or progress updates. The R1FS + ChainStorage duo helps maintain a mutualized view of ongoing work, while each participant retains full control of their proprietary information.

Collaborative data collection
Deploy on multiple IoT devices or analytics nodes - each sending local stats into R1FS and announcing them. Every node sees everyone else’s data in near real-time.

Decentralized document exchange
Need to exchange large files or critical documents? Store them in R1FS and have your network retrieve them directly from whichever node has them - no single point of failure.

Cross-node configuration
Share configuration or state files across distributed systems. One node’s update automatically becomes available to all others, ensuring frictionless synchronization.

What Is the R1fsDemoPlugin?

The R1fsDemoPlugin is a demonstration plugin designed to:

  1. Generate random data and store it in R1FS, Ratio1's private, IPFS-like storage system.

  2. Publish the resulting CID (Content Identifier) of that data to the Ratio1 ChainStore (hash-set), allowing all participating peers to discover it.

  3. Periodically watch for new CIDs published by other peers, retrieve the corresponding files, and process them (in our example, a simple YAML dump).

In short, it’s a straightforward illustration of how to broadcast data - be it YAML-based or any other type of file - to a distributed network where everyone can find and download it. Whether you’re spinning up one node or an entire swarm, R1fsDemoPlugin ensures that all participants can both announce and consume each other’s shared data in near real-time.

1. Storing Data in R1FS

When you want to share data:

  • Randomized Info: The plugin generates a UUID plus a random integer.

  • Serialization: That info is serialized to YAML.

  • Commit to R1FS: Using r1fs.add_yaml(), it’s stored in R1FS - instantly returning a CID to reference the newly added content.

cid = self.r1fs.add_yaml(data)

Note: R1FS supports various formats and methods ranging from basic add_file(), get_file() to  add_json(), add_pickle(), etc. For the demo, we’re using YAML for clarity, but the concept is identical for other data formats.

2. Announcing via ChainStorage

Once a piece of data is safely tucked away in R1FS, its CID is announced to the network:

self.chainstore_hset(hkey='r1fs-demo', key=self.my_id, value=cid)
  • hkey='r1fs-demo' functions like a namespace for the key-value store (just like a hash set).

  • key=self.my_id is the plugin’s unique identifier (we often use the node alias or instance ID).

  • value=cid is the pointer to the actual data stored in R1FS.

Now, any peer can run chainstore_hgetall('r1fs-demo') to fetch all announced CIDs, including the one just published.

3. Discovering & Downloading Remote CIDs

Periodically, the plugin checks for CIDs announced by others via dct_data = self.chainstore_hgetall('r1fs-demo')

It then filters out its own announcements (to avoid re-fetching the data it just created) and retrieves the files using fn = self.r1fs.get_file(cid)

If the file is YAML, the plugin dumps the content to the logs (or does something more interesting in a real-world scenario). R1FS handles the underlying peer-to-peer content exchange, ensuring the file is fetched from whichever node advertises it.

The Main process() Loop

Every 30 seconds (or however it’s configured), the process() method is called. It does two critical things:

  1. share_local_data(): Checks if it’s been an hour since the last data share. If so, it creates new random data, stores it in R1FS, and publishes the CID via the ChainStore.

  2. show_remote_shared_data():Looks up new CIDs published by other peers, retrieves them, and logs the data for visibility.

This ensures that every node in the system gently trickles out new data and picks up fresh content from others, making the entire network’s data flow both fluid and robust.

Putting It All Together

Below is an example pipeline (think of it as a Ratio1 “app”) that directs each node to run this plugin. With just one step - launch the pipeline on at least two nodes - you’re already set to see data seamlessly shared across nodes.

{
  "NAME": "r1fs_demo_pipeline",
  "PLUGINS": [
      {
          "INSTANCES": [
              {
                  "INSTANCE_ID": "DEFAULT"
              }
          ],
          "SIGNATURE": "R1FS_DEMO"
      }
   
  ],
  "TYPE": "Void"
}

Simply load this pipeline on any two (or more!) Ratio1 nodes to start the R1fsDemoPlugin on each. Once running, the plugin will take care of storing random data, announcing the CID, and discovering what others share. You can also programmatically launch a pipeline using Python code.

Note: Always make sure your SDK is paired with your node

After updating your SDK via

pip install –upgrade ratio1

Just run a simple configuration checking using

r1ctl config show

And you will receive something like

Then you just copy the 0xai internal address and give it to your node via the container add_allowed command:

docker exec r1node add_allowed 0xai_xxxxxxxxxxxxxxxxxxxxxxxxxx

Where certainly the 0xai_xxxxxxxxxxxxxxxxxxxxxxxxxx is the address copied from your r1ctl config show client command.  As we result you should obtain something similar with this output below where you can clearly see in the allowed addresses whitelist your client address. If this command fails then it means your node is not running or it is running under other docker container name than r1node (only if you make custom special installs)

Note: running the docker exec r1node add_allowed or the docker exec r1node get_node_info commands will show a list of several (quite a few actually) nodes that are allowed on your node by default - these are the oracles that supervise your node and make sure you get your Proof-of-Availability rewards as well as your Proof-of-AI rewards for the specific jobs you receive.

For this tutorial, make sure you have allowed the SDK and the Edge Nodes to communicate. Follow the allowance scheme below.

Allow Schme Edge Nodes SDK Ratio1
import json
from ratio1 import Instance, Payload, Pipeline, Session, PAYLOAD_DATA
class SimplePayloadHandler:
  """ Simple payload handler class build for convenience """
  def __init__(self):
    self.n_received = 0
    return
 
  def instance_on_data(self, pipeline: Pipeline, payload: Payload):
    # we extract the payload and do a minimal processing
    # this is the payload key sent by the demo plugin
    R1FS_DEMO_DATA_KEY = "R1FS_DATA"
    # next convert to the data object for convenience
    data = payload.data
    # then we extract the sender alias directly from the payload
    sender_alias = data[PAYLOAD_DATA.EE_ID]
    # we extract the r1fs data if it is present
    r1fs_data = data.get(R1FS_DEMO_DATA_KEY)
    if r1fs_data is None:
      # we ignore the payload if the data is not present
      return
    # we increment the number of received payloads
    self.n_received += 1
    # then we extract the R1FS file originator (creator) from the demo data
    read_file_originator = r1fs_data["owner_id"]
    # we print the data
    pipeline.P("Data received #{} from worker '{}' from file created by '{}':\n {}".format(
      self.n_received, sender_alias, read_file_originator, json.dumps(r1fs_data, indent=2)
    ))
    return
if __name__ == '__main__':
  session: Session = Session()
 
  payload_handler = SimplePayloadHandler()
 
  # this code assumes the node have "allowed" the SDK to deploy the pipeline
  nodes = [
    '0xai_A2LfyeItL5oEp7nHONlczGgwS3SV8Ims9ujJ0soJ6Anx',
    '0xai_AqgKnJMNvUvq5n1wIin_GD2i1FbZ4FBTUJaCI6cWf7i4',
  ]
  for node in nodes:
    session.P(f"Deploying pipeline to node: {node}")
    session.wait_for_node(node) # we wait for the node to be ready
    pipeline: Pipeline = session.create_pipeline(
      node=node, name='r1fs_demo_pipeline', data_source='Void',
      debug=True,
    )
    # The ideea is that we create a plugin instance that listens for data
    # on ChainStorage from other plugins/nodes that create R1FS demo files
    # while itself also creating R1FS demo files
    instance: Instance = pipeline.create_plugin_instance(
      signature='R1FS_DEMO', on_data=payload_handler.instance_on_data,
      instance_id='inst01',
      debug=True,
    )
    pipeline.deploy()
  WAIT_TIME = 150
  session.P(f"All pipelines deployed, we wait for {WAIT_TIME} seconds...")
  session.wait(
    seconds=WAIT_TIME,            # we wait the session for 60 seconds
    close_pipelines=True,   # we close the pipelines after the session
    close_session=True,     # we close the session after the session
  )
 
  session.P("Session closed. A total of {} payloads were received.".format(
    payload_handler.n_received
  ))
  if payload_handler.n_received == 0:
    session.P("No payloads were received. Probably your nodes do not cooperate ('allow') with each other.")
  session.P("Main thread exiting...")

After a short wait, you’ll see the plugin logs start streaming details about new data being stored and discovered. It’s a trivial example that packs a big punch in demonstrating decentralized workflows.

Conclusion

The R1fsDemoPlugin perfectly showcases how R1FS and ChainStorage can unify to create a frictionless, fully decentralized data-sharing environment. Whether you’re dealing with simple YAML blobs or entire directories, the principle remains the same:

  1. Store your content in a swarm-based file system.

  2. Broadcast its location in a distributed key-value store.

  3. Synchronize among peers - everyone picks up what they need, when they need it.

Give it a spin, watch the logs, and see how elegantly your nodes learn about and acquire each other’s data. As you gain confidence, you’ll find it easy to layer more complex features like tokens, advanced access rules, or concurrency on top of the same basic approach.

Ready to try it yourself? Load the plugin, tweak the data-sharing interval, or store custom data - then sit back and let R1FS and ChainStorage do their magic.

Full R1FS Demo Plugin Code

Below is the complete code for the R1fsDemoPlugin. Feel free to copy and adapt it in your Ratio1 deployment. n

Note: Please be aware that this plugin code is already bundled within the Ratio1 Edge Nodes as a “template” plugin so you do not need to execute, try or do anything with it. The plugin itself cannot and will not run outside the Edge Node ecosystem/environment.

from naeural_core.business.base import BasePluginExecutor as BasePlugin
__VER__ = '0.1.0.0'
_CONFIG = {
  # mandatory area
  **BasePlugin.CONFIG,
  # our overwritten props
  'PROCESS_DELAY' : 15,  
  'INITIAL_WAIT'  : 15,
  # due to the fact that we are using a "void" pipeline,
  # we need to allow empty inputs as we are not getting any
  # data from the pipeline
  'ALLOW_EMPTY_INPUTS': True,
  'VALIDATION_RULES' : {
    **BasePlugin.CONFIG['VALIDATION_RULES'],    
  },  
}
class R1fsDemoPlugin(BasePlugin):
 
  def on_init(self):
    # we store a unique ID for this worker (instance) asuming it is unique
    self.my_id = f'r1:{self.ee_id}' # node alias is just a naive approach
    self.__file_send_time = 0 # last time we sent a file
    self.__known_cids = [] # keep track of known CIDs
    self.__start_time = self.time() # start time of the plugin
    self.__r1fs_demo_iter = 0 # iteration counter
    self.P(f"Starting R1fsDemoPlugin v{__VER__} with ID: {self.my_id}. Plugin instance will now wait for {self.cfg_initial_wait} sec")
    return
 
  def __save_some_data(self):
    """ Save some data to the R1FS """
    self.P("Saving some data...")
    uuid = self.uuid() # generate some random data
    value = self.np.random.randint(1, 100) # even more random data
    data = {
      'some_key': uuid,
      'other_key': value,
      'owner_id' : self.my_id,
      'owner_key' : self.full_id
    }
    filename = f"{self.ee_id}_{self.__r1fs_demo_iter}"
    cid = self.r1fs.add_yaml(data, fn=filename)
    self.P(f"Data saved with CID: {cid}")
    return cid
 
  def __announce_cid(self, cid):
    """ Announce the CID to the network via ChainStore hsets"""
    self.P(f'Announcing CID: {cid} for {self.my_id}')
    self.chainstore_hset(hkey='r1fs-demo', key=self.my_id, value=cid)
    return    
 
  def __get_announced_cids(self):
    """ Get all announced CIDs except our own from ChainStore hsets"""
    cids = []
    self.P("Checking for any announced CIDs...")
    # get full dictionary of all announced CIDs under the key 'r1fs-demo'
    # we assume all demo instances are using the same hkey and their own key
    dct_data = self.chainstore_hgetall('r1fs-demo')
    self.P(f"Extracted hset data (I am {self.my_id}):\n {self.json_dumps(dct_data, indent=2)}")
    if dct_data:
      # extract all the CIDs except our own
      cids = [
        v for k, v in dct_data.items() if k != self.my_id
      ]
      # now we filter based on already known CIDs
      cids = [
        cid for cid in cids if cid not in self.__known_cids
      ]
    if len(cids) > 0:
      self.P(f"Found {len(cids)} CIDs ")
      self.__known_cids.extend(cids)
    return cids
 
  def share_local_data(self):
    """ Share some data with the network """
    if self.time() - self.__file_send_time > 3600:
      self.P("Sharing data...")
      cid = self.__save_some_data()
      self.__announce_cid(cid)
    return cid
 
  def show_remote_shared_data(self):
    """ Retrieve and process shared data """
    cids = self.__get_announced_cids()
    self.P(f"Found {len(cids)} shared data...")
    for cid in cids:
      self.P(f"Retrieving: {cid}")
      fn = self.r1fs.get_file(cid)
      self.P(f"Retrieved: {fn}")
      if fn.endswith('.yaml') or fn.endswith('.yml'):
        data = self.diskapi_load_yaml(fn, verbose=False)        
        self.P(f"Loaded:\n {self.json_dumps(data, indent=2)}")
        self.P("Delivering the data to potential consumers...")
        self.add_payload_by_fields(
          r1fs_data=data,
        )
      else:
        self.P(f"Received unsupported file: {fn}", color='r')
    # end for each CID
    return
   
  def process(self):
    if self.time() - self.__start_time < self.cfg_initial_wait:
      self.P(f"Waiting for {self.cfg_initial_wait} sec to start processing...")
      return
    self.__r1fs_demo_iter += 1
    self.P(f'R1fsDemoPlugin is processing iter #{self.__r1fs_demo_iter}')
    self.share_local_data()
    self.show_remote_shared_data()
    self.P('R1fsDemoPlugin is done.')
    return

That’s it! The R1fsDemoPlugin provides a neat, self-contained example of the decentralized power offered by Ratio1’s ecosystem. Happy experimenting - and welcome to the future of peer-to-peer data sharing.

Andrei Ionut Damian

Andrei Ionut Damian

Feb 21, 2025

The Ultimate AI OS Powered by Blockchain Technology

©Ratio1 2025. All rights reserved.

The Ultimate AI OS Powered by Blockchain Technology

©Ratio1 2025. All rights reserved.

The Ultimate AI OS Powered by Blockchain Technology

©Ratio1 2025. All rights reserved.