Distributed Prime Number Calculation Tutorial using PyE2 SDK

Education

Prime Number Calculation

In this tutorial, we will explore how to calculate prime numbers both locally and on a distributed network using the PyE2 SDK. We will walk through setting up the development environment, running the prime number calculation locally, and deploying the same task to multiple edge nodes in a distributed manner for improved performance.

Setup

Before we dive into the actual code, we need to ensure the proper setup of the development environment and SDK.

1. Install the SDK

To get started, install the PyE2 SDK using pip:

pip install PyE2

2. Configure the Environment

You need to configure a .env file that contains the necessary credentials and environment variables. You can use the provided example file as a template:

cp tutorials/.example_env .env

Make sure to fill in the required fields in the .env file with appropriate values.

3. Manage Private Keys

For security, never publish sensitive information such as private keys. To experiment with this tutorial on a test network, you can use a provided test key. Alternatively, you can generate a new private key using the SDK, which will store it in your working directory.

If using a provided private key, copy the example file from tutorials/_example_pk_sdk.pem to _local_cache/_data/, renaming it to _pk_sdk.pem.

Local Execution: Finding Prime Numbers

To start off, we will write a program that finds all 168 prime numbers between 1 and 1000. The code will run on a local machine and use multithreading to speed up the calculation.

Prime Number Generation Logic

Here is the Python code to generate prime numbers locally:

import numpy as np
from concurrent.futures import ThreadPoolExecutor

def local_brute_force_prime_number_generator():
    def is_prime(n):
        if n <= 1:
            return False
        for i in range(2, int(np.sqrt(n)) + 1):
            if n % i == 0:
                return False
        return True

    random_numbers = np.random.randint(1, 1000, 20)
    thread_pool = ThreadPoolExecutor(max_workers=4)
    are_primes = list(thread_pool.map(is_prime, random_numbers))

    prime_numbers = []
    for i in range(len(random_numbers)):
        if are_primes[i]:
            prime_numbers.append(random_numbers[i])

    return prime_numbers


The method local_brute_force_prime_number_generator generates a random sample of 20 numbers and checks each one to see if it is prime. The program leverages ThreadPoolExecutor to parallelize the work across multiple threads.

We run this method repeatedly until we collect 168 unique prime numbers, as shown below:

if __name__ == "__main__":
    found_so_far = []
    print_step = 0

    while len(found_so_far) < 168:
        prime_numbers = local_brute_force_prime_number_generator()

        for prime_number in prime_numbers:
            if prime_number not in found_so_far:
                found_so_far.append(prime_number)

        if print_step % 50 == 0:
            print(f"Found so far: {len(found_so_far)}: {sorted(found_so_far)}\n")

        print_step += 1

    print(f"Found all primes: {len(found_so_far)}: {sorted(found_so_far)}")

In this code, we continuously compute new prime numbers and only add them to our list if they haven’t been discovered yet. The program prints progress every 50 iterations.

Remote Execution: Distributed Prime Number Calculation

While the local method works, it’s limited by the resources of a single machine. To speed up the process, we can distribute the work across multiple nodes using the PyE2 network.

Adapting Code for Distributed Execution

To adapt the local prime number generator for remote execution, we need to make some adjustments to the code:

from PyE2 import CustomPluginTemplate

def remote_brute_force_prime_number_generator(plugin: CustomPluginTemplate):
    def is_prime(n):
        if n <= 1:
            return False
        for i in range(2, int(plugin.np.sqrt(n)) + 1):
            if n % i == 0:
                return False
        return True

    random_numbers = plugin.np.random.randint(1, 1000, 20)
    are_primes = plugin.threadapi_map(is_prime, random_numbers, n_threads=4)

    prime_numbers = []
    for i in range(len(random_numbers)):
        if are_primes[i]:
            prime_numbers.append(random_numbers[i])

    return prime_numbers

Connecting to the Network

Once the code is adapted for remote execution, we need to connect to the PyE2 network to discover available edge nodes. We do this by creating a session and using the on_heartbeat callback to list online nodes:

from PyE2 import Session
from time import sleep

def on_heartbeat(session: Session, node_addr: str, heartbeat: dict):
    session.P(f"{node_addr} ({heartbeat.get('EE_ID')}) is online")

if __name__ == '__main__':
    session = Session(on_heartbeat=on_heartbeat)
    sleep(15)
    session.close()

Sending a Task to a Node

Once we have identified an available node, we can send the prime number calculation task to it. This node will distribute the work to other nodes and collect the results.

session = Session()
node = "0xai_Amfnbt3N-qg2-qGtywZIPQBTVlAnoADVRmSAsdDhlQ-6"  # naeural-2

session.wait_for_node(node)

Handling Partial Results

As nodes process the task, they will periodically send partial results. We need a callback function to handle these results:

from PyE2 import Pipeline

finished = False

def locally_process_partial_results(pipeline: Pipeline, full_payload):
    global finished
    found_so_far = full_payload.get("DATA")

    if found_so_far:
        pipeline.P(f"Found so far: {len(found_so_far)}: {sorted(found_so_far)}\n")

    if full_payload.get("PROGRESS") == 100:
        pipeline.P("FINISHED\n")
        finished = True

Deploying the Job

We now create and deploy the distributed task using the PyE2 API:

from PyE2 import DistributedCustomCodePresets as Presets

_, _ = session.create_chain_dist_custom_job(
    node=node,
    main_node_process_real_time_collected_data=Presets.PROCESS_REAL_TIME_COLLECTED_DATA__KEEP_UNIQUES_IN_AGGREGATED_COLLECTED_DATA,
    main_node_finish_condition=Presets.FINISH_CONDITION___AGGREGATED_DATA_MORE_THAN_X,
    main_node_finish_condition_kwargs={"X": 167},
    main_node_aggregate_collected_data=Presets.AGGREGATE_COLLECTED_DATA___AGGREGATE_COLLECTED_DATA,
    nr_remote_worker_nodes=2,
    worker_node_code=remote_brute_force_prime_number_generator,
    on_data=locally_process_partial_results,
    deploy=True
)

Completing the Job

Finally, we wait for the distributed task to finish and close the session:

session.run(wait=lambda: not finished, close_pipelines=True)

Conclusion

In this tutorial, we demonstrated how to find prime numbers locally and then scale the task to a distributed network using PyE2. By leveraging the power of edge nodes, we can significantly improve the performance of computationally expensive tasks such as prime number generation.

Andrei Ionut Damian

Andrei Ionut Damian

Dec 19, 2024

The Ultimate AI OS Powered by Blockchain Technology

©Ratio1 2024. All rights reserved.