Developing Workers#

Overview#

Workers are Python scripts that communicate with EMhub via its REST API (using Python client code). It is a good mechanism to extend EMhub functionality and get some tasks executed in machines other than those running the EMhub server. The main difference between a worker and another script using the API is that the worker is supposed to run all the time (e.g., as a Linux service) and maintain two-way communication with the EMhub server via task handling.

For example, one could use a worker in a machine that has access to the image acquisition directory. That worker could get notified (via a task) when a new session has started and where are the expected image files. From there, the worker can handle data transfer and trigger on-the-fly processing if required. It can also update the information related to that session, reflecting the progress of the associated tasks (e.g., number of files transferred, total size, data processing, etc).

Implementing a worker will likely require some coding to address the specific needs of a given task. However, some base classes have already been implemented to help with server communication and other operations. In this section, we will go through a comprehensive example of a worker implementation that will also touch on different aspects of the system architecture.

Launching a basic OTF worker#

In this example, we will implement a simple EMhub worker that will launch a CryoEM on-the-fly data processing. The workflow will use Scipion to run the pipeline and will execute the following steps: import, motioncor, ctffind and cryolo.

Installation with Scipion and Redis setup#

Let’s start by by installing EMhub with Scipion since we will need it to launch the OTF workflow.

Note

If you are using a different software for the workflow, then it is not required to install EMhub with Scipion.

Generate the test dataset to use it as the starting point for our next steps:

emh-data --create_instance

The workers’ interactions with EMhub via tasks require more communication and concurrency than other operations. Thus, Redis is required for in-memory storage of task information, as well as some caching. We need to ensure that we have properly installed Redis (server side and Python client library for EMhub). Moreover, the redis.conf file should be provided in the instance folder. Check more about it on the Caching with Redis page.

Summarizing:

# Install Redis server and client from conda
conda install -y redis redis-py -c conda-forge

# Copy Redis conf file from template
mv $EMHUB_INSTANCE/redis.conf.template $EMHUB_INSTANCE/redis.conf

# Run Redis server in background
cd $EMHUB_INSTANCE && redis-server redis.conf --daemonize yes

# Test connection with Redis server:
redis-cli -p 5001 set foo boo
emh-client redis --keys

Testing the Client and basic Worker#

Once EMhub’s server is running, we can open a new terminal and configure the environment for using a REST client. Load the EMhub environment (e.g. conda environment) and set the variables accordingly. In the test instance, there is a $EMHUB_INSTANCE/bashrc file that one can easily source:

source $EMHUB_INSTANCE/bashrc

# Check if the client is properly configured

emh-client
# Should print all EMHUB_* variables and their values

emh-client form -l all
# Should print the list of Forms from the server

When we are sure that the client can communicate properly with the server, we need to register our machine as a possible worker. First, find out what is the hostname as given by the following command:

emt-ps --hostname
c124663

Then you need to edit the form config:hosts JSON (from the Forms page) with that host as key (in my case c124663) as in the following example:

{
    "c124663": {"alias": "c124663"}
}

Once the hostname is registered as a possible host, we can launch a test worker to check if it connects with the server:

python -m emhub.client.worker

If everything goes well, you should see the worker log and it should be ready for handling tasks. If we look again into the config:hosts form, now the entry should be extended with your machine hardware as reported by the worker. In my case, it looks like the following:

{
    "c124663": {
        "alias": "c124663",
        "updated": "2024-06-16 12:08:52",
        "specs": {
            "CPUs": 128,
            "GPUs": {
                "NVIDIA GeForce RTX 3090": {
                    "count": 2,
                    "memory": "24576 MiB"
                }
            },
            "MEM": 503
        },
        "connected": "2024-06-16 12:08:52"
    }
}

Now we need to navigate to the Workers page and ensure that our host is displayed there in green with a recent “Last update” value. Next, we can create a “command” task to verify if the worker processes it correctly. Click on the Create Task button and enter command as the task name and {“cmd”: “ls -l /tmp/”} as the arguments. Subsequently, a new Task entry should appear as Pending, and the worker terminal should acknowledge the new tasks and begin processing them. This task will run the provided command and report back the results in the Task history. After a while, you should be able to view the task status as done and review the results in the history.

Creating a Session to trigger OTF#

We have all the components ready to begin developing a worker for OTF data processing. If you visit the Dashboard page of your test instance, you might notice some bookings for this week. When you click on the “New Session” button, an error will occur. This is because we still need to create a dialog page for setting up a new session and establish the necessary infrastructure to handle it.

Let’s first create an $EMHUB_INSTANCE/extra folder for extra customization and copy some files we already have as examples.

mkdir $EMHUB_INSTANCE/extra

cp -r $SCIPION_HOME/source/core/emhub/extras/test/* $EMHUB_INSTANCE/extra/

This should copy the following files:

extra/templates/create_session_form.html

Template file to define the session creation dialog.

extra/data_content.py

File defining content functions to support template files, in this case create_session_form.

extra/test_worker.py

Test worker to launch OTF workflow.

Read more about EMhub customization here.

After copying the additional files, ensure that the EMhub server is stopped, along with the worker that was running in the previous section. Restart the EMhub server by running “flask run –debug” to reload the content from “$EMHUB_INSTANCE/extra”. In the worker terminal (with the client environment already configured), execute:

python $EMHUB_INSTANCE/extra/test_worker.py

Remember to click on the “New Session” button again from the Dashboard page to open the session dialog. Make sure to input a folder with data, specify the image pattern, and provide the gain reference image file. Also, select “Scipion” as the workflow and choose an output folder.

https://github.com/3dem/emhub/wiki/images/emhub_new_session_test.png
https://github.com/3dem/emhub/wiki/images/emhub_new_session_testFILLED.png

After creating the session, two tasks will be generated: “monitor” and “otf_test”. The “monitor” task will instruct the worker to monitor the input folder and provide information about the number of files, images, and overall folder size. The “otf_test” task will launch the OTF workflow with Scipion. Below, you can find the related session pages.

https://github.com/3dem/emhub/wiki/images/emhub_new_session_test_info.png
https://github.com/3dem/emhub/wiki/images/emhub_new_session_test_live2.png

Continue reading the next section to delve into the code of the files in “extra” and better understand the role of the underlying components.

Understanding underlying components#

Jinja2/HTML/Javascript#

In the file extra/templates/create_session_form.html, we define the HTML template for arranging the inputs in the session dialog. Additionally, we write some JavaScript code to retrieve the values input by the user and communicate with the server to create tasks related to the session, which will be handled by the worker. Let’s take a look at a code fragment from that file:

Code fragment from: extra/templates/create_session_form.html#
 1<!-- Modal body -->
 2<div class="modal-body">
 3<input type="hidden" id="booking-id" value="{{ booking.id }}">
 4
 5<!-- Create Session Form -->
 6<div class="col-xl-12 col-lg-12 col-md-12 col-sm-12 col-12">
 7    <form id="session-form" data-parsley-validate="" novalidate="">
 8        <div class="row">
 9            <!-- Left Column -->
10            <div class="col-7">
11                {{ section_header("Basic Info") }}
12
13                <!-- Some lines omitted here -->
14
15                <!-- Project id -->
16                {% call macros.session_dialog_row_content("Project ID") %}
17                    <select id="session-projectid-select" class="selectpicker show-tick" data-live-search="true" data-key="project_id">
18                            <option value="0">Not set</option>
19                            {% for p in projects %}
20                                {% set selected = 'selected' if p.id == booking.project.id else '' %}
21                                <option {{ selected }} value="{{ p.id }}">{{ p.title }}</option>
22                            {% endfor %}
23                        </select>
24                {% endcall %}
25
26                {{ section_header("Data Processing", 3) }}
27                {{ macros.session_dialog_row('Input RAW data folder', 'raw_folder', '', 'Provide RAW data folder') }}
28                {{ macros.session_dialog_row('Input IMAGES pattern', 'images_pattern', acquisition['images_pattern'], '') }}
29                {{ macros.session_dialog_row('Input GAIN image', 'gain', '', '') }}
30                {{ macros.session_dialog_row('Output OTF folder', 'otf_folder', '', '') }}

In line 3, we are defining a hidden input and the value is expanded to the booking.id. The booking variable should be provided to render the template by the corresponding content function (create_session_form).

In line 7, we are defining a form that will help us conveniently retrieve all the values provided by the user. To achieve this, the inputs need to define the data-key value, which will be used as the key in the collected data mapping (e.g., line 17). Additionally, we are defining data-key values in lines 27 to 30 by using Jinja2 macros. These macros make it easy to generate repeating blocks of HTML template with different parameters.

The JavaScript section of this template file also plays an important role in compiling the information provided by the user and creating tasks using EMhub’s REST API.

Javascript fragment from extra/templates/create_session_form.html#
 1function onCreateClick(){
 2    var formValues = getFormAsJson('session-form');
 3    var host = formValues.host;
 4    var attrs = {
 5        booking_id: parseInt(document.getElementById('booking-id').value),
 6        acquisition: {
 7            voltage: formValues.acq_voltage,
 8            magnification: formValues.acq_magnification,
 9            pixel_size: formValues.acq_pixel_size,
10            dose: formValues.acq_dose,
11            cs: formValues.acq_cs,
12            images_pattern: formValues.images_pattern,
13            gain: formValues.gain
14        },
15        tasks: [['monitor', host]],
16        extra: {
17            project_id: formValues.project_id, raw: {}, otf: {}
18        }
19    }
20
21    // Some lines omitted here
22
23    // Validate that the OTF folder is provided if there is an OTF workflow selected
24    if (formValues.otf_folder){
25        attrs.tasks.push(['otf_test', host])
26        attrs.extra.otf.path = formValues.otf_folder;
27        attrs.extra.otf.workflow = formValues.otf_workflow;
28    }
29    else if (formValues.otf_workflow !== 'None') {
30        showError("Provide a valid <strong>OUTPUT data folder</strong> if " +
31            "doing any processing");
32        return;
33    }
34
35    var ajaxContent = $.ajax({
36        url: "{{ url_for('api.create_session') }}",
37        type: "POST",
38        contentType: 'application/json; charset=utf-8',
39        data: JSON.stringify({attrs: attrs}),
40        dataType: "json"
41    });
42
43    ajaxContent.done(function(jsonResponse) {
44        if ('error' in jsonResponse)
45            showError(jsonResponse['error']);
46        else {
47            window.location = "{{ url_for_content('session_default') }}" +
48                "&session_id=" + jsonResponse.session.id;
49        }
50    });

First, on line 2, all input values are retrieved from the form. Then, on line 6, the acquisition object is prepared as expected by the server REST endpoint. On line 15, an initial task monitor is defined, and an extra task otf_test is added on line 24 if the otf_folder has a non-empty value. The second parameter of the tasks is the hostname where they will be executed, which the user provides in the session form. Finally, on line 35, the AJAX request is sent to create a new session. If the result is successful, the page is reloaded, or an error is shown otherwise.

The Content Function#

To render the template page, the create_session_form is needed, which should provide all the required data. This function should be provided in the extra/data_content.py file.

Content function in extra/data_content.py#
 1@dc.content
 2def create_session_form(**kwargs):
 3    """ Basic session creation for EMhub Test Instance
 4    """
 5    dm = dc.app.dm  # shortcut
 6    user = dc.app.user
 7    booking_id = int(kwargs['booking_id'])
 8
 9    # Get the booking associated with this Session to be created
10    b = dm.get_booking_by(id=booking_id)
11    can_edit = b.project and user.can_edit_project(b.project)
12
13    # Do some permissions validation
14    if not (user.is_manager or user.same_pi(b.owner) or can_edit):
15        raise Exception("You can not create Sessions for this Booking. "
16                        "Only members of the same lab can do it.")
17
18    # Retrieve configuration information from the Form config:sessions
19    # We fetch default acquisition info for each microscope or
20    # the hosts that are available for doing OTF processing
21    sconfig = dm.get_config('sessions')
22
23    # Load default acquisition params for the given microscope
24    micName = b.resource.name
25    acq = sconfig['acquisition'][micName]
26    otf_hosts = sconfig['otf']['hosts']
27
28    data = {
29        'booking': b,
30        'acquisition': acq,
31        'session_name_prefix': '',
32        'otf_hosts': otf_hosts,
33        'otf_host_default': '',
34        'workflows': ['None', 'Scipion'],
35        'workflow_default': '',
36        'transfer_host': '',
37        'cryolo_models': {}
38    }
39    data.update(dc.get_user_projects(b.owner, status='active'))
40    return data

This content function is designed to take in one parameter, which is the booking_id. It is read in line 7 and used in line 10 to fetch the Booking entry from the database using SqlAlchemy ORM. In line 21, an example demonstrates the process of retrieving “configuration” forms, which follow the naming convention of config:NAME, and then utilizing them in the session (or any template page) dialog. In this instance, we are making use of config:session to automatically populate default acquisition values for different microscopes. Finally, the data dictionary is made up of various key-value pairs and is returned. This data will be utilized by Flask to render the template.

The Worker Script#

The last component is the worker code in extra/test_worker.py. Workers are typically created using subclasses of two classes: TaskHandler and Worker. The Worker class establishes the connection with EMhub and defines the types of tasks it will react to by creating the corresponding TaskHandler. This class will then “process” the given tasks. The code fragment below shows the process function for our TaskHandler.

def process(self):
    try:
        if self.action == 'monitor':
            return self.monitor()
        elif self.action == 'otf_test':
            return self.otf()
        raise Exception(f"Unknown action {self.action}")
    except Exception as e:
        self.update_task({'error': str(e), 'done': 1})
        self.stop()

Here, the handler defines a process function for tasks of type monitor or otf_test and launches an error otherwise.

Important

The process function will be called from an infinite loop. The handler can set the self.sleep attribute to sleep that many seconds between calls. It should also call the function self.stop() when the task is completed (successfully or with failure) and no more processing is needed. The attribute self.count can also be used to know the count of process function calls.

Below is the monitor function that basically check the number of files and their size in the input data folder. It will update back the task with that information.

 1def monitor(self):
 2    extra = self.session['extra']
 3    raw = extra['raw']
 4    raw_path = raw['path']
 5    # If repeat != 0, then repeat the scanning this number of times
 6    repeat = self.task['args'].get('repeat', 1)
 7
 8    if not os.path.exists(raw_path):
 9        raise Exception(f"Provided RAW images folder '{raw_path}' does not exists.")
10
11    print(Color.bold(f"session_id = {self.session['id']}, monitoring files..."))
12    print(f"    path: {raw['path']}")
13
14    if self.count == 1:
15        self.mf = MovieFiles()
16
17    self.mf.scan(raw['path'])
18    update_args = self.mf.info()
19    raw.update(update_args)
20    self.update_session_extra({'raw': raw})
21
22    if repeat and self.count == repeat:
23        self.stop()
24        update_args['done'] = 1
25
26    # Remove dict from the task update
27    del update_args['files']
28    self.update_task(update_args)

In line 6, there is an optional parameter called repeat which indicates the number of times to repeat the “monitor”. The MovieFiles class from the emtools library is instantiated in line 15 and utilized in line 17 to scan the input folder. This class incorporates a caching mechanism to prevent re-reading files that have already been read. In line 20, the function update_function_extra is invoked to update the extra property of the session with the retrieved information. Subsequently, in line 28, the task is updated, setting done=1, which marks the task as completed.

The following code snippet displays the otf function, which shares some similarities with the monitor function but performs different tasks. The main distinctions are in line 13, where a JSON configuration file is created, and in line 20, where the workflow is initiated.

 1def otf(self):
 2    # Some lines omitted here
 3
 4    # Create OTF folder and configuration files for OTF
 5    def _path(*paths):
 6        return os.path.join(otf_path, *paths)
 7
 8    self.pl.mkdir(otf_path)
 9    os.symlink(raw_path, _path('data'))
10    acq = self.session['acquisition']
11    # Make gain relative to input raw data folder
12    acq['gain'] = _path('data', acq['gain'])
13    with open(_path('scipion_otf_options.json'), 'w') as f:
14        opts = {'acquisition': acq, '2d': False}
15        json.dump(opts, f, indent=4)
16
17    otf['status'] = 'created'
18
19    # Now launch Scipion OTF
20    self.pl.system(f"scipion python -m emtools.scripts.emt-scipion-otf --create {otf_path} &")

Other Worker Examples#

Cluster Queues Worker#

This example demonstrates an existing worker responsible for monitoring the jobs of a queueing system. The worker code is simple, mainly defining its capability to handle a “cluster-lsf” task by registering a TaskHandler for it.

class LSFWorker(Worker):
    def handle_tasks(self, tasks):
        for t in tasks:
            if t['name'] == 'cluster-lsf':
                handler = LSFTaskHandler(self, t)
            else:
                handler = DefaultTaskHandler(self, t)
            handler.start()

In the next step, the task handler will implement the process function and use the function LSF().get_queues_json('cryo') to retrieve information about the jobs running on the “cryo” queues. This part can be modified to make this worker compatible with a different queueing system. The retrieved information will be stored in args['queues'] as a JSON string and then sent to the EMhub server using the update_task function.

class LSFTaskHandler(TaskHandler):
    def __init__(self, *args, **kwargs):
        TaskHandler.__init__(self, *args, **kwargs)

    def process(self):
        args = {'maxlen': 2}
        try:
            from emtools.hpc.lsf import LSF
            queues = LSF().get_queues_json('cryo')
            args['queues'] = json.dumps(queues)
        except Exception as e:
            args['error'] = f"Error: {e}"
            args.update({'error': str(e),
                         'stack': traceback.format_exc()})

        self.logger.info("Sending queues info")
        self.update_task(args)
        time.sleep(30)

EPU Session Monitoring#

In this example, we have implemented a TaskHandler that monitors a filesystem path in order to detect new EPU session folders. It utilizes the request_config function to obtain configuration information from the EMhub server. Specifically, it retrieves the location where the raw frames will be written. The process function is designed to be called continually, allowing the handler to scan the location for new folders. As with the previous example, the gathered information is sent back to EMhub as a JSON string through the update_task function.

class FramesTaskHandler(TaskHandler):
    """ Monitor frames folder located at
    config:sessions['raw']['root_frames']. """
    def __init__(self, *args, **kwargs):
        TaskHandler.__init__(self, *args, **kwargs)
        # Load config
        self.sconfig = self.request_config('sessions')
        self.root_frames = self.sconfig['raw']['root_frames']

    def process(self):
        if self.count == 1:
            self.entries = {}

        args = {'maxlen': 2}
        updated = False

        try:
            for e in os.listdir(self.root_frames):
                entryPath = os.path.join(self.root_frames, e)
                s = os.stat(entryPath)
                if os.path.isdir(entryPath):
                    if e not in self.entries:
                        self.entries[e] = {'mf': MovieFiles(), 'ts': 0}
                    dirEntry = self.entries[e]
                    if dirEntry['ts'] < s.st_mtime:
                        dirEntry['mf'].scan(entryPath)
                        dirEntry['ts'] = s.st_mtime
                        updated = True
                elif os.path.isfile(entryPath):
                    if e not in self.entries or self.entries[e]['ts'] < s.st_mtime:
                        self.entries[e] = {
                            'type': 'file',
                            'size': s.st_size,
                            'ts': s.st_mtime
                        }
                        updated = True

            if updated:
                entries = []
                for e, entry in self.entries.items():
                    if 'mf' in entry:  # is a directory
                        newEntry = {
                            'type': 'dir',
                            'size': entry['mf'].total_size,
                            'movies': entry['mf'].total_movies,
                            'ts': entry['ts']
                        }
                    else:
                        newEntry = entry
                    newEntry['name'] = e
                    entries.append(newEntry)

                args['entries'] = json.dumps(entries)
                u = shutil.disk_usage(self.root_frames)
                args['usage'] = json.dumps({'total': u.total, 'used': u.used})

        except Exception as e:
            updated = True  # Update error
            args['error'] = f"Error: {e}"
            args.update({'error': str(e),
                         'stack': traceback.format_exc()})

        if updated:
            self.info("Sending frames folder info")
            self.update_task(args)

        time.sleep(30)

Data Transfer and On-The-Fly Processing#

Below is a more detailed example of a worker responsible for managing data transfer or processing data in real time during a specific session. It receives new tasks from the EMhub server, retrieves information about the assigned session, and updates the session details as the tasks are processed.

Check the Sessions Worker code in Github.