Workflows

This module allows you to easily push your data through a determined set of tasks and stop/continue execution if necessary.

Create a workflow

Create a workflow for your data using functions as individual tasks.

from invenio.modules.workflows.tasks.sample_tasks import (
    add_data,
    halt_if_higher_than_20,
)

class myworkflow(object):
    \"\"\"Add 20 to integer and halt if higher.\"\"\"
    workflow = [add_data(20),
                halt_if_higher_than_20]

Save it as a new file in your module located under workflows/ with the same name as the class. For example at yourmodule/workflows/myworkflow.py.

The workflow attribute should be a list of functions (or list of lists of tasks) as per the conditions of the underlying workflows-module.

Create a task

The functions in the workflow are called tasks. Each task must at least take two arguments:

def halt_if_higher_than_20(obj, eng):
    \"\"\"Check if current data is more than than 20.\"\"\"
    if obj.data > 20:
        eng.halt("Data higher than 20.")
obj (models.BibWorkflowObject)

is the current object being worked on

obj adds extra functionality by wrapping around your data and provide utilities to interface with the Holding Pen interface.

eng (engine.BibWorkflowEngine)

is the current instance of the workflow engine

eng give you access to manipulating the workflow execution itself and to retrieve all the objects being processed.

Other parameters may be passed as *args or **kwargs.

Pass additional arguments

To allow arguments being passed to the task from the workflow definition, simply wrap your task in a closure:

def add_data(data_param):
    \"\"\"Add data_param to the obj.data.\"\"\"
    def _add_data(obj, eng):
        data = data_param
        obj.data += data

    return _add_data

It can then be called from the workflow definition as add_data(20), returning the inner function.

Run a workflow

Finally, to run your workflow you there are mainly two use-cases:

  • run only a single data object, or
  • run multiple data objects through a workflow.

The former use the models.BibWorkflowObject model API, and the latter use the api.

Run a single data object

Note

This method is recommended if you only have one data item you want to run through the workflow.

from invenio.modules.workflows.models import BibWorkflowObject
myobj = BibWorkflowObject.create_object()
myobj.set_data(10)
eng = myobj.start_workflow("myworkflow")

Once the workflow completes it will return the engine instance that ran it.

To get the data, simply call the get_data() function of models.BibWorkflowObject

myobj.get_data()  # outputs: 30

Run multiple data objects

Note

This method is recommended if you need to run several objects through a workflow.

To do this simply import the workflows API function start() and provide a list of objects:

from invenio.modules.workflows.api import start
eng = start(workflow_name="myworkflow", data=[5, 10])

Here we are passing simple data objects in form integers.

As usual, the start() function returns the eng instance that ran the workflow. You can query this object to retrieve the data you sent in:

len(eng.objects)  # outputs: 4

Why 4 objects when we only shipped 2 objects? Well, we take initial snapshots (copy of BibWorkflowObject) of the original data. In the example above, we get 4 objects back as each object passed have a snapshot created.

You can also query the engine instance to only give you the objects which are in a certain state.

len(eng.initial_objects)  # outputs: 2

len(eng.halted_objects)  # outputs: 2

len(eng.completed_objects)  # outputs: 0

len(eng.running_objects)  # outputs: 0

len(eng.waiting_objects)  # outputs: 0

len(eng.error_objects)  # outputs: 0

(eng.completed_objects is empty because both objects passed is halted.)

This output is actually representative of snapshots of the objects, not the objects themselves. The _default_ snapshotting behaviour is also evident here: There is one snapshot taken in the beginning of the execution and one when the object reaches one of the other states. A snapshot can only be in a single state.

No object will ever be in the running state under usual operation.

Moreover, to retrieve the data from the first object, you can use get_data() as with single objects:

res = halted_objects[0].get_data()
print res
# outputs: 25

Run workflows asynchronously

So far we have in been running our workflows in the current process. However, for long running processes we do not want to wait for the workflow to finish before continuing the processing.

Luckily, there is API to do this:

BibWorkflowObject.start_workflow(delayed=True)
as when running single objects, you can pass the delayed parameter to enable asynchronous execution.
api.start_delayed()
The API provide this function start_delayed() to run a workflow asynchronously.

To use this functionality you need to make sure you are running a task queue such as Celery that will run the workflow in a separate process.

Note

The delayed API returns a worker_result.AsynchronousResultWrapper instead of a engine.BibWorkflowEngine instance.

In order to communicate with such a task queue we make use of worker plugins.

Workers

A worker is a plugin (or bridge) from the Invenio workflows module to some distributed task queue. By default, we have provided workers for Celery and RQ.

These plugins are used by the worker_engine to launch workflows asynchronously in a task queue.

We recommend to use Celery as the default asynchronous worker.

Working with extra data

If you need to add some extra data to the models.BibWorkflowObject that is not suitable to add to the obj.data attribute, you can make use if the obj.extra_data attribute.

The extra_data attribute is basically a normal dictionary that you can fill. However it contains some additional information by default.

{
    "_tasks_results": {},
    "owner": {},
    "_task_counter": {},
    "_error_msg": None,
    "_last_task_name": "",
    "latest_object": -1,
    "_action": None,
    "redis_search": {},
    "source": "",
    "_task_history: [],
}

This information is used by the models.BibWorkflowObject to store some additional data related to the workflow execution and additional data added by tasks.

It also stores information that is integrated with Holding Pen - the graphical interface for all the data objects.

Holding Pen

The graphical interface over all the data objects that have been executed in a workflow.

The name Holding Pen originates from a library use case of having some incoming bibliographical meta-data records on “hold” - awaiting some human curator to analyze the record and decide if the record should be inserted into the repository.

One common usage of this interface is acceptance of user submissions.

We will take this concept of record approval further throughout this guide as we explain the most common use cases for the Holding Pen.

Note

The Holding Pen is accessible under /admin/holdingpen

Data object display in Holding Pen

To properly represent a data objects in the Holding Pen, the workflow definition explained above can be further enriched by adding some static functions to the class.

  • get_title: return the “title” of the data object shown in the table display.

    E.g. title of meta-data record

  • get_description: return a short desciption of the data object shown in the table display.

    E.g. identifiers and categories

  • formatter: used in the object detailed display to render the data in the object for the user.

    E.g. the detailed record format of a meta-data record.

Actions in Holding Pen

An action in Holding Pen is a generic term describing an action that can be taken on a data object.

To use the example of record approval, we basically mean adding GUI buttons to accept or reject a data object. The action taken (the button pressed) on the data object is then connected to a custom action back-end that may then decide to e.g. continue the workflow or simply delete the object.

Adding an action

By default we have added an approval action which can be used to allow a data object to continue the workflow or be deleted.

workflows/actions/approval.py
Action back-end located in workflows/actions/approval.py that implements render(), render_mini() and resolve(). resolve() handles the continuation or deletion of the data object using the workflows API.
templates/workflows/actions/approval_(main|mini|side).html

jinja templates used to render the action UI. There are different templates in play here depending on position.

  • mini: displayed in the main table (for a quick action).
  • side: displayed on the right side of the object details page.
  • main: displayed in the middle of the object details page.
static/workflows/actions/approval.js
JavaScript file listening to events in the approval UI to call the backend via ajax calls.

To enable the JavaScript to be loaded via requireJS, you need to override the actions “init” JavaScript file static/workflows/actions/init.js on your overlay and add any initialization code for the action (e.g. attaching events).

Using an action

There are two ways of activating an action:

Task results in Holding Pen

If you want to add some special task results to be displayed on the details page of the data object in Holding Pen, you can use the task results API available in models.BibWorkflowObject.

The API provides functions to manipulate the task results:

models.BibWorkflowObject.add_task_result()
Adds a task result to the end of a list associated with a label (name).
models.BibWorkflowObject.update_task_results()
Update task result for a specific label (name).
models.BibWorkflowObject.get_tasks_results()
Return all tasks results as a dictionary as { name: [result, ..] }

The task result is a dictionary given as context to the template when rendered. The result given here is added to a list of results for this name.

obj = BibWorkflowObject()  # or BibWorkflowObject.query.get(id)
obj.add_task_result("foo", my_result, "path/to/template")

See sample templates under templates/workflows/results/*.html.

API

Main API for the workflows.

If you want to run a workflow using the workflows module, this is the high level API you will want to use.

class invenio.modules.workflows.api.WorkerBackend

WorkerBackend is a class representing the worker.

It will automatically get the worker thanks to the configuration when called.

worker

Represent the worker.

This cached property is the one which is returning the worker to use.

Returns:the worker configured into the configuration file.
invenio.modules.workflows.api.continue_oid(oid, start_point='continue_next', **kwargs)

Continue workflow for given object id (oid).

Depending on start_point it may start from previous, current or next task.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:
  • oid (str) – id of BibWorkflowObject to run.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task
Returns:

BibWorkflowEngine that ran the workflow

invenio.modules.workflows.api.continue_oid_delayed(oid, start_point='continue_next', **kwargs)

Continue workflow for given object id (oid), asynchronously.

Similar behavior as continue_oid(), except it runs it asynchronously.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:
  • oid (str) – id of BibWorkflowObject to run.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task
Returns:

AsynchronousResultWrapper.

invenio.modules.workflows.api.resume_objects_in_workflow(id_workflow, start_point='continue_next', **kwargs)

Resume workflow for any halted or failed objects from given workflow.

This is a generator function and will yield every workflow created per object which needs to be resumed.

To identify the original workflow containing the halted objects, the ID (or UUID) of the workflow is required. The starting point to resume the objects from can optionally be given. By default, the objects resume with their next task in the workflow.

Parameters:
  • id_workflow (str) – id of Workflow with objects to resume.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task

yield: BibWorkflowEngine that ran the workflow

invenio.modules.workflows.api.start(workflow_name, data, **kwargs)

Start a workflow by given name for specified data.

The name of the workflow to start is considered unique and it is equal to the name of a file containing the workflow definition.

The data passed should be a list of object(s) to run through the workflow. For example: a list of dict, JSON string, BibWorkflowObjects etc.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc.

The workflow engine object generated is returned upon completion.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • data (list) – the workflow name to run. Ex: “my_workflow”.
Returns:

BibWorkflowEngine that ran the workflow.

invenio.modules.workflows.api.start_by_oids(workflow_name, oids, **kwargs)

Start workflow by name with models.BibWorkflowObject ids.

Wrapper to call start() with list of models.BibWorkflowObject ids.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • oids (list) – BibWorkflowObject id’s to run.
Returns:

BibWorkflowEngine that ran the workflow.

invenio.modules.workflows.api.start_by_oids_delayed(workflow_name, oids, **kwargs)

Start asynchronously workflow by name with models.BibWorkflowObject ids.

Similar behavior as start_by_oids(), except it calls start_delayed().

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • oids (list) – list of BibWorkflowObject id’s to run.
Returns:

AsynchronousResultWrapper.

invenio.modules.workflows.api.start_by_wid(wid, **kwargs)

Re-start given workflow, by workflow uuid (wid).

It is restarted from the beginning with the original data given.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:wid (str) – the workflow uuid. Ex: “550e8400-e29b-41d4-a716-446655440000”.
Returns:BibWorkflowEngine that ran the workflow.
invenio.modules.workflows.api.start_by_wid_delayed(wid, **kwargs)

Re-start given workflow, by workflow uuid (wid), asynchronously.

Similar behavior as start_by_wid(), except it starts the workflow delayed by using one of the defined workers available.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:wid (str) – the workflow uuid. Ex: “550e8400-e29b-41d4-a716-446655440000”.
Returns:AsynchronousResultWrapper
invenio.modules.workflows.api.start_delayed(workflow_name, data, **kwargs)

Start a workflow by given name for specified data, asynchronously.

Similar behavior as start(), except it starts the workflow delayed by using one of the defined workers available.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the object AsynchronousResultWrapper.get.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • data (list) – the workflow name to run. Ex: “my_workflow”.
Returns:

AsynchronousResultWrapper

Models

Models for BibWorkflow Objects.

class invenio.modules.workflows.models.Workflow(**kwargs)

Represents a workflow instance.

Used by BibWorkflowEngine to store the state of the workflow.

classmethod delete(*a, **k)

Wrapper function to manage DB session.

classmethod get(*criteria, **filters)

Wrapper to get a specified object.

A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values.

Workflow.get(uuid=uuid)
Workflow.get(Workflow.uuid != uuid)

The function supports also “hybrid” arguments.

Workflow.get(Workflow.module_name != 'i_hate_this_module',
             user_id=user_id)

See also SQLAlchemy BaseQuery’s filter and filter_by documentation.

get_extra_data(user_id=0, uuid=None, key=None, getter=None)

Get the extra_data for the object.

Returns a JSON of the column extra_data or if any of the other arguments are defined, a specific value.

You can define either the key or the getter function.

Parameters:
  • key – the key to access the desirable value
  • getter – callable that takes a dict as param and returns a value
classmethod get_most_recent(*criteria, **filters)

Return the most recently modified workflow.

classmethod get_objects(uuid=None)

Return the objects of the workflow.

classmethod get_status(uuid=None)

Return the status of the workflow.

save(*a, **k)

Wrapper function to manage DB session.

set_extra_data(user_id=0, uuid=None, key=None, value=None, setter=None)

Replace extra_data.

Modifies the JSON of the column extra_data or if any of the other arguments are defined, a specific value. You can define either the key, value or the setter function.

Parameters:
  • key – the key to access the desirable value
  • value – the new value
  • setter – a callable that takes a dict as param and modifies it
class invenio.modules.workflows.models.BibWorkflowObject(**kwargs)

Data model for wrapping data being run in the workflows.

Main object being passed around in the workflows module when using the workflows API.

It can be instantiated like this:

obj = BibWorkflowObject()
obj.save()

Or, like this:

obj = BibWorkflowObject.create_object()

BibWorkflowObject provides some handy functions such as:

obj.set_data("<xml ..... />")
obj.get_data() == "<xml ..... />"
obj.set_extra_data({"param": value})
obj.get_extra_data() == {"param": value}
obj.add_task_result("myresult", {"result": 1})

Then to finally save the object

obj.save()

Now you can for example run it in a workflow:

obj.start_workflow("sample_workflow")
add_task_result(name, result, template='workflows/results/default.html')

Add a new task result defined by name.

The name is the dictionary key used to group similar types of results as well as a possible label for the result.

The result is a dictionary given as context to the template when rendered. The result given here is added to a list of results for this name.

obj = BibWorkflowObject()  # or BibWorkflowObject.query.get(id)
obj.add_task_result("foo", my_result, "path/to/template")
Parameters:
  • name (string) – The name of the task in human friendly way. It is used as a key and label for the result rendering.
  • result (dict) – The result to store - passed to render_template().
  • template (string) – The location of the template to render the result.
change_status(message)

Change the status.

continue_workflow(start_point='continue_next', delayed=False, **kwargs)

Continue the workflow for this object.

Will continue a previous execution for the object using api.continue_oid() (or api.continue_oid_delayed() if delayed=True).

The parameter start_point allows you to specify the point of where the workflow shall continue:

  • restart_prev: will restart from the previous task
  • continue_next: will continue to the next task
  • restart_task: will restart the current task
Parameters:
  • start_point (str) – where should the workflow start from?
  • delayed (bool) – should the workflow run asynchronously?
Returns:

BibWorkflowEngine (or AsynchronousResultWrapper).

copy(other)

Copy data and metadata except id and id_workflow.

classmethod create_object(*a, **k)

Wrapper function to manage DB session.

classmethod create_object_revision(*a, **k)

Wrapper function to manage DB session.

classmethod delete(*a, **k)

Wrapper function to manage DB session.

classmethod get(*criteria, **filters)

Wrapper of SQLAlchemy to get a BibWorkflowObject.

A wrapper for the filter and filter_by functions of SQLAlchemy. Define a dict with which columns should be filtered by which values.

Workflow.get(uuid=uuid)
Workflow.get(Workflow.uuid != uuid)

The function supports also “hybrid” arguments.

Workflow.get(Workflow.module_name != 'i_hate_this_module',
             user_id=user_id)

See also SQLAlchemy BaseQuery’s filter and filter_by documentation.

get_action()

Retrieve the currently assigned action, if any.

Returns:name of action assigned as string, or None
get_action_message()

Retrieve the currently assigned widget, if any.

get_current_task()

Return the current task from the workflow engine for this object.

get_current_task_info()

Return dictionary of current task function info for this object.

get_data()

Get data saved in the object.

get_error_message()

Retrieve the error message, if any.

get_extra_data()

Get extra data saved to the object.

get_formatted_data(of='hd')

Get the formatted representation for this object.

get_log(*criteria, **filters)

Return a list of log entries from BibWorkflowObjectLog.

You can specify additional filters following the SQLAlchemy syntax.

Get all the logs for the object:

b = BibWorkflowObject.query.get(1)
b.get_log()

Get all the logs for the object labeled as ERROR.

b = BibWorkflowObject.query.get(1)
b.get_log(BibWorkflowObjectLog.log_type == logging.ERROR)
Returns:list of BibWorkflowObjectLog
get_tasks_results()

Return the complete set of tasks results.

The result is given as a dictionary where each result is structured like:

task_result = {
   "name": name,
   "result": result,
   "template": template
}
Returns:dictionary of results as {name: [result, ..], ..}
get_workflow_name()

Return the workflow name for this object.

log

Access logger object for this instance.

remove_action()

Remove the currently assigned action.

reset_error_message()

Reset the error message.

save(*a, **k)

Wrapper function to manage DB session.

save_to_file(directory=None, prefix='workflow_object_data_', suffix='.obj')

Save the contents of self.data[‘data’] to file.

Returns path to saved file.

Warning: Currently assumes non-binary content.

set_action(action, message)

Set the action to be taken for this object.

Assign an special “action” to this object to be taken in consideration in Holding Pen. The widget is referred to by a string with the filename minus extension.

A message is also needed to tell the user the action required in a textual way.

Parameters:
  • action (string) – name of the action to add (i.e. “approval”)
  • message (string) – message to show to the user
set_data(value)

Save data to the object.

set_error_message(msg)

Set an error message.

set_extra_data(value)

Save extra data to the object.

Parameters:value (dict) – what you want to replace extra_data with.
start_workflow(workflow_name, delayed=False, **kwargs)

Run the workflow specified on the object.

Will start workflows execution for the object using api.start() (or api.start_delayed() if delayed=True).

Parameters:
  • workflow_name (str) – name of workflow to run
  • delayed (bool) – should the workflow run asynchronously?
Returns:

BibWorkflowEngine (or AsynchronousResultWrapper).

update_task_results(name, results)

Update tasks results by name.

The name is the dictionary key used to group similar types of results as well as a possible label for the result.

This functions allows you to update (replace) the list of results associated with a name where each result is structured like this:

task_result = {
   "name": "foo",
   "result": result,
   "template": template
}
obj = BibWorkflowObject()  # or BibWorkflowObject.query.get(id)
obj.update_task_results("foo", [task_result])
Parameters:
  • name (string) – The name of the task in human friendly way. It is used as a key and label for the result rendering.
  • results (list) – List of results to store - passed to render_template().
  • template (string) – The location of the template to render the result.
class invenio.modules.workflows.models.BibWorkflowObjectLog(**kwargs)

Represents a log entry for BibWorkflowObjects.

This class represent a record of a log emit by an object into the database. The object must be saved before using this class as it requires the object id.

classmethod delete(id=None)

Delete an instance in database.

classmethod get(*criteria, **filters)

SQLAlchemy wrapper to get BibworkflowLogs.

A wrapper for the filter and filter_by functions of SQLAlchemy. Define a dict with which columns should be filtered by which values.

See also SQLAlchemy BaseQuery’s filter and filter_by documentation.

classmethod get_most_recent(*criteria, **filters)

Return the most recently created log.

class invenio.modules.workflows.models.BibWorkflowEngineLog(**kwargs)

Represents a log entry for BibWorkflowEngine.

This class represent a record of a log emit by an object into the database. The object must be saved before using this class as it requires the object id.

classmethod delete(uuid=None)

Delete an instance in database.

classmethod get(*criteria, **filters)

Sqlalchemy wrapper to get BibWorkflowEngineLog.

A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values.

look up also sqalchemy BaseQuery’s filter and filter_by documentation

classmethod get_most_recent(*criteria, **filters)

Return the most recently created log.

Engine

The workflow engine extension of GenericWorkflowEngine.

class invenio.modules.workflows.engine.BibWorkflowEngine(name=None, uuid=None, curr_obj=0, workflow_object=None, id_user=0, module_name='Unknown', **kwargs)

GenericWorkflowEngine with DB persistence for py:mod:invenio.workflows.

Adds a SQLAlchemy database model to save workflow states and workflow data.

Overrides key functions in GenericWorkflowEngine to implement logging and certain workarounds for storing data before/after task calls (This part will be revisited in the future).

abortProcessing()

Abort current workflow execution without saving object.

static after_processing(objects, self)

Action after process to update status.

static before_processing(objects, self)

Executed before processing the workflow.

completed_objects

Return completed objects.

counter_object

Return the number of object.

error_objects

Return error objects.

execute_callback(callback, obj)

Execute the callback (workflow tasks).

extra_data

Sets up property

get_current_object()

Return the currently active BibWorkflowObject.

get_current_taskname()

Get name of current task/step in the workflow (if applicable).

get_default_data_type()

Return default data type from workflow definition.

get_extra_data()

Main method to retrieve data saved to the object.

halt(msg, action=None)

Halt the workflow by raising WorkflowHalt.

Halts the currently running workflow by raising WorkflowHalt.

You can provide a message and the name of an action to be taken (from an action in actions registry).

Parameters:
  • msg (str) – message explaining the reason for halting.
  • action (str) – name of valid action in actions registry.
Raises:

WorkflowHalt

halted_objects

Return halted objects.

has_completed()

Return True if workflow is fully completed.

id_user

Return the user id.

increase_counter_error()

Indicate we crashed the processing of one object.

increase_counter_finished()

Indicate we finished the processing of one object.

increase_counter_halted()

Indicate we halted the processing of one object.

initial_objects

Return initial objects.

module_name

Return the module name.

name

Return the name.

objects

Return the objects associated with this workflow.

objects_of_statuses(statuses)

Get objects having given statuses.

process(objects)

Process objects.

Parameters:objects – objects to process.
static processing_factory(objects, self)

Default processing factory.

An extended version of the default processing factory with saving objects before successful processing.

Default processing factory, will process objects in order.

Parameters:objects (list) – list of objects (passed in by self.process()).

As the WFE proceeds, it increments the internal counter, the first position is the number of the element. This pointer increases before the object is taken.

2nd pos is reserved for the array that points to the task position. The number there points to the task that is currently executed; when error happens, it will be there unchanged. The pointer is updated after the task finished running.

reset_extra_data()

Reset extra data to defaults.

restart(obj, task)

Restart the workflow engine at given object and task.

Will restart the workflow engine instance at given object and task relative to current state.

obj must be either:

  • “prev”: previous object
  • “current”: current object
  • “next”: next object
  • “first”: first object

task must be either:

  • “prev”: previous object
  • “current”: current object
  • “next”: next object
  • “first”: first object

To continue with next object from the first task:

wfe.restart("next", "first")
Parameters:
  • obj (str) – the object which should be restarted
  • task (str) – the task which should be restarted
running_objects

Return running objects.

save(status=None)

Save the workflow instance to database.

set_counter_initial(obj_count)

Initiate the counters of object states.

Parameters:obj_count (int) – Number of objects to process.
set_extra_data(value)

Main method to update data saved to the object.

set_extra_data_params(**kwargs)

Add keys/value in extra_data.

Allows the addition of value in the extra_data dictionary, all the data must be passed as “key=value”.

set_task_position(new_position)

Set current task position.

set_workflow_by_name(workflow_name)

Configure the workflow to run by the name of this one.

Allows the modification of the workflow that the engine will run by looking in the registry the name passed in parameter.

Parameters:workflow_name (str) – name of the workflow.
skipToken()

Skip current workflow object without saving it.

status

Return the status.

uuid

Return the uuid.

waiting_objects

Return waiting objects.

class invenio.modules.workflows.engine.WorkflowStatus

The different possible value for Workflow Status.

Workers

Mediator between API and workers responsible for running the workflows.

invenio.modules.workflows.worker_engine.continue_worker(oid, restart_point='continue_next', task_offset=1, **kwargs)

Restart workflow with given id (wid) at given point.

By providing the restart_point you can change the point of which the workflow will continue from.

  • restart_prev: will restart from the previous task
  • continue_next: will continue to the next task (default)
  • restart_task: will restart the current task

**kwargs can be used to pass custom arguments to the engine/object.

Parameters:
  • oid (int) – object id of the object to process
  • restart_point (str) – point to continue from
Returns:

BibWorkflowEngine instance

invenio.modules.workflows.worker_engine.create_data_object_from_data(data_object, engine, data_type)

Create a new BibWorkflowObject from given data and return it.

Returns a data object wrapped around data_object given. In addition it creates an initial snapshot.

Parameters:
  • data_object (object) – object containing the data
  • engine (py:class:.engine.BibWorkflowEngine) – Instance of Workflow that is currently running.
  • data_type (str) – type of the data given as taken from workflow definition.
Returns:

new BibWorkflowObject

invenio.modules.workflows.worker_engine.generate_snapshot(workflow_object, engine)

Save a version of the BibWorkflowObject passed in parameter.

Given a workflow object, generate a snapshot of it’s current state and return the given instance to work on.

Also checks if the given workflow instance has a valid version and is not in RUNNING state.

Parameters:
  • workflow_object (py:class:.models.BibWorkflowObject) – BibWorkflowObject to create snapshot from.
  • engine (py:class:.engine.BibWorkflowEngine) – Instance of Workflow that is currently running.
Returns:

BibWorkflowObject – workflow_object instance

Raises:

WorkflowObjectVersionError

invenio.modules.workflows.worker_engine.get_workflow_object_instances(data, engine)

Analyze data and create corresponding BibWorkflowObjects.

Wrap each item in the given list of data objects into BibWorkflowObject instances - creating appropriate versions of objects in the database and returning a list of these objects.

This process is necessary to save an initial version of the data before running it (and potentially changing it) in the workflow.

This function also takes into account if given data objects are already BibWorkflowObject instances.

Parameters:
  • data (list) – list of data objects to wrap
  • engine (py:class:.engine.BibWorkflowEngine) – instance of BibWorkflowEngine
Returns:

list of BibWorkflowObject

invenio.modules.workflows.worker_engine.restart_worker(wid, **kwargs)

Restart workflow from beginning with given id (wid) and any data.

**kwargs can be used to pass custom arguments to the engine/object such as data. If data is not specified then it will load all initial data for the data objects.

Data can be specified as list of objects or single id of BibWorkflowObjects.

Parameters:wid (str) – workflow id (uuid) of the workflow to be restarted
Returns:BibWorkflowEngine instance
invenio.modules.workflows.worker_engine.run_worker(wname, data, **kwargs)

Run a workflow by name with list of data objects.

The list of data can also contain BibWorkflowObjects.

**kwargs can be used to pass custom arguments to the engine/object.

Parameters:
  • wname (str) – name of workflow to run.
  • data (list) – objects to run through the workflow.
Returns:

BibWorkflowEngine instance

Contain the AsynchronousResultWrapper class for asynchronous execution.

class invenio.modules.workflows.worker_result.AsynchronousResultWrapper(asynchronousresult)

Wrap results from asynchronous results.

This class is an abstract class. When you inherit it you should absolutely implement all the functions.

This class is here for two reason, get and unified interface for all the worker and so allow to switch from one to another seamlessly, and also add feature to functions.

For example the get method now allow a post processing on the result.

get(postprocess=None)

Return the value of the process.

status()

Return the current status of the tasks.

invenio.modules.workflows.worker_result.uuid_to_workflow(uuid)

Return the workflow associated to an uuid.

Views

View blueprints for Holding Pen.

Holding Pen is a web interface overlay for all BibWorkflowObject’s.

This area is targeted to catalogers and administrators for inspecting and reacting to workflows executions. More importantly, allowing users to deal with halted workflows.

For example, accepting submissions or other tasks.

invenio.modules.workflows.views.holdingpen.continue_record(*args, **kwargs)

Continue workflow for current object.

invenio.modules.workflows.views.holdingpen.delete_from_db(*args, **kwargs)

Delete the object from the db.

invenio.modules.workflows.views.holdingpen.delete_multi(*args, **kwargs)

Delete list of objects from the db.

invenio.modules.workflows.views.holdingpen.details(*args, **kwargs)

Display info about the object.

invenio.modules.workflows.views.holdingpen.entry_data_preview(*args, **kwargs)

Present the data in a human readble form or in xml code.

invenio.modules.workflows.views.holdingpen.get_context(*args, **kwargs)

Return the a JSON structure with URL maps and actions.

invenio.modules.workflows.views.holdingpen.get_file_from_task_result(*args, **kwargs)

Send the requested file to user from a workflow task result.

Expects a certain file meta-data structure in task result:

{
    "type": "Fulltext",
    "filename": "file.pdf",
    "full_path": "/path/to/file",
}
invenio.modules.workflows.views.holdingpen.index(*args, **kwargs)

Display main interface of Holdingpen.

Acts as a hub for catalogers (may be removed)

invenio.modules.workflows.views.holdingpen.load_table(*args, **kwargs)

Get JSON data for the Holdingpen table.

Function used for the passing of JSON data to DataTables:

  1. First checks for what record version to show
  2. Then the sorting direction.
  3. Then if the user searched for something.
Returns:JSON formatted str from dict of DataTables args.
invenio.modules.workflows.views.holdingpen.maintable(*args, **kwargs)

Display main table interface of Holdingpen.

invenio.modules.workflows.views.holdingpen.resolve_action(*args, **kwargs)

Resolve the action taken.

Will call the resolve() function of the specific action.

invenio.modules.workflows.views.holdingpen.restart_record(*args, **kwargs)

Restart the initial object in its workflow.

invenio.modules.workflows.views.holdingpen.restart_record_prev(*args, **kwargs)

Restart the last task for current object.