Example of a query module written in Python
We will examine how the query module example
is implemented using the
C API and the Python API. Both query modules can be found in the
/usr/lib/memgraph/query_modules
directory.
If you require more information about what query modules are, please read the query modules overview page
Python API
Query modules can be implemented using the Python API
provided by Memgraph. If you wish to write your own query modules using the
Python API, you need to have Python version 3.5.0
or above installed.
Every single Memgraph installation comes with the py_example.py
query module
located in the /usr/lib/memgraph/query_modules
directory. It was provided
as an example of a .py
query module for you to examine and learn from.
If you are working with Docker and would like to open the file on your computer, copy it from the Docker container.
Transferring files from a Docker container:
If you are using Docker to run Memgraph, you can copy the files from the Docker container to your local directory.
1. Start your Memgraph instance using Docker.
2. Open a new terminal and find the CONTAINER ID
of the Memgraph Docker
container:
docker ps
3. Position yourself in the directory where you want to transfer the file.
4. Copy a file from the container to the current directory:
docker cp <CONTAINER ID>:/usr/lib/memgraph/query_modules/py_example.py py_example.py
Don't forget to replace the <CONTAINER ID>
.
You can develop query modules in Python from Memgraph Lab (v2.0 and newer). Just navigate to Query Modules and click on New Module to start.
If you need an additional Python library not included with Memgraph, check out the guide on how to install it.
Readable procedure
Let's take a look at the py_example.py
file and its first line:
import mgp
On the first line, we import the mgp
module, which contains definitions of the
public Python API provided by Memgraph. In essence, this is a wrapper around the
C API described in the next section. This file (mgp.py
) can be found in the
Memgraph installation directory /usr/lib/memgraph/python_support
.
Because our procedure will only read from the database, we pass it to a
read_proc
decorator, which handles read-only procedures. You can also inspect
the definition of said decorator in the mgp.py
file or take a look at the
Python API reference
guide.
Next, we define the procedure
that will be used as the callback for our
py_example.procedure
invocation through Cypher.
@mgp.read_proc
def procedure(context: mgp.ProcCtx,
required_arg: mgp.Nullable[mgp.Any],
optional_arg: mgp.Nullable[mgp.Any] = None
) -> mgp.Record(args=list,
vertex_count=int,
avg_degree=mgp.Number,
props=mgp.Nullable[mgp.Map]):
...
Because we need to access the graph to get results, the first argument takes the
ProcCtx
type, which is actually the graph. Then we defined two arguments, a
required and an optional argument that will be bound to the values passed in
the Cypher query. They can be either null or of any type.
The return type must be Record(field_name=type, ...)
, and the procedure must
produce either a complete Record
or None
.
In our case, the example procedure returns four fields:
args
: a copy of arguments passed to the procedure.vertex_count
: number of vertices in the database.avg_degree
: average degree of vertices.props
: properties map of the Vertex or Edge object passed as therequired_arg
. In case a Path object is passed, the procedure returns the properties map of the starting vertex.
We defined that this procedure can be invoked in Cypher as follows:
MATCH (n) WITH n LIMIT 1 CALL py_example.procedure(n, 1) YIELD * RETURN *;
To get the props
result, first we need to check if the passed argument is an
Edge, Vertex or Path and create the properties map:
if isinstance(required_arg, (mgp.Edge, mgp.Vertex)):
props = dict(required_arg.properties.items())
elif isinstance(required_arg, mgp.Path):
start_vertex, = required_arg.vertices
props = dict(start_vertex.properties.items())
In the case of mgp.Edge
and mgp.Vertex
, we obtain an instance of
mgp.Properties
class and invoke the items()
method which returns an
Iterable
containing mgp.Property
objects of our mgp.Edge
or
mgp.Vertex
. Since the type of mgp.Property
is a simple
collections.namedtuple
containing name
and value
, we can easily pass it to
a dict
constructor thus creating a map.
To get the vertex_count
result we need to count the number of vertices and
edges in our graph:
vertex_count = 0
edge_count = 0
for v in context.graph.vertices:
vertex_count += 1
edge_count += sum(1 for e in v.in_edges)
edge_count += sum(1 for e in v.out_edges)
First, we set our variables and then access the mgp.Graph
instance via
context.graph
. The mgp.Graph
instance contains the state of the database at
the time execution of the Cypher query that is calling our procedure. The
mgp.Graph
instance also has the property vertices
that allows us to access
the mgp.Vertices
object, which can be iterated upon, thus
increasing the variable on each traversed vertex.
Similarly, each mgp.Vertex
object has in_edges
and out_edges
properties,
allowing us to iterate over the corresponding mgp.Edge
objects, thus
increasing the variable on each traversed edge.
Lastly, we calculate the avg_degree
value and obtain a copy of the passed
arguments:
avg_degree = 0 if vertex_count == 0 else edge_count / vertex_count
args_copy = [copy.deepcopy(required_arg), copy.deepcopy(optional_arg)]
At the end, we return a mgp.Record
with all the calculated values:
return mgp.Record(args=args_copy, vertex_count=vertex_count,
avg_degree=avg_degree, props=props)
Writeable procedures
Writeable procedures are implemented similarly as read-only procedures. The only difference is that writeable procedures receive mutable objects. Therefore they can create and delete vertices or edges, modify the properties of vertices and edges, and add or remove labels of vertices.
We can implement a very simple writeable query module similarly to read-only procedures. The following procedure creates a new vertex with a certain property name and its value passed as arguments and connects it to all existing vertices that have a property with the same name and value:
@mgp.write_proc
def write_procedure(context: mgp.ProcCtx,
property_name: str,
property_value: mgp.Nullable[mgp.Any]
) -> mgp.Record(created_vertex=mgp.Vertex):
# Collect all the vertices that have a property with
# the same name and value as the passed arguments
vertices_to_connect = []
for v in context.graph.vertices:
if v.properties[property_name] == property_value:
vertices_to_connect.append(v)
# Create the new vertex and set its property
vertex = context.graph.create_vertex()
vertex.properties.set(property_name, property_value)
# Connect the new vertex to the other vertices
for v in vertices_to_connect:
context.graph.create_edge(vertex, v, mgp.EdgeType("HAS_SAME_VALUE"))
# Return a field containing the newly created vertex
return mgp.Record(created_vertex=vertex)
Batched read procedures
Similar to the regular read
procedure, Memgraph also includes batched read
procedure. Batched procedures are very similar to regular procedures. The key difference is that batched procedures return results in batches, mostly to reduce memory consumption. For batched procedures, you need to define three functions:
batching
function - similar to the main function in regular proceduresinitialization
function - function to initialize stream, open source file, etc.cleanup
function - function to close a stream, source file, etc.
Since there are three functions, construct works as follows:
initialization
function must be defined in a way it receives the same parameters in the same order as batching function, includingmgp.ProcCtx
if it's defined as the first parameter- when calling the procedure from the query, you need to call the
batching
function - Memgraph calls
initialization
before thebatching
function batching
function needs to return an empty result at some point, which signals the end of the streamcleanup
function is called at the end of the stream
There is no decorator used to register batched read procedure, so use the mgp
function mgp.add_batch_read_proc('batch', 'init', 'cleanup')
mysql_dict = {}
def init_migrate(
table: str,
config: mgp.Map,
):
global mysql_dict
query = f"SELECT * FROM {table};"
mysql_dict = {}
# Init dict and store variables for later reference.
if mysql_dict is None:
connection = mysql_connector.connect(**config)
cursor = connection.cursor(buffered=True)
# Executes, but doesn't fetch. Fetching is done in batches
# in `migrate`
cursor.execute(query)
mysql_dict["connection"] = connection
mysql_dict["cursor"] = cursor
mysql_dict["column_names"] = [column[0] for column in cursor.description]
def migrate(
table_or_sql: str,
config: mgp.Map,
) -> mgp.Record(row=mgp.Map):
global mysql_dict
cursor = mysql_dict["cursor"]
column_names = mysql_dict["column_names"]
rows = cursor.fetchmany(1000)
return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows]
def cleanup_migrate():
global mysql_dict
mysql_dict["cursor"] = None
mysql_dict["connection"].close()
mysql_dict["connection"].commit()
mysql_dict["connection"] = None
mysql_dict["column_names"] = None
mysql_dict = None
mgp.add_batch_read_proc(migrate, init_migrate, cleanup_migrate)
def _name_row_cells(row_cells, column_names) -> Dict[str, Any]:
return dict(map(lambda column, value: (column, value), column_names, row_cells))
Batched write procedures
Similar to batched read
procedures, you can define batched write
procedures. Batched procedures can return results in batches, mostly to reduce memory consumption. For batch write
procedures like for batched read
procedures you need to define three functions:
batching
function - similar to the main function in regular proceduresinitialization
function - function to initialize a stream, open source file, etc.cleanup
function - function to close a stream, source file, etc.
Since there are three functions, construct works as follows:
initialization
function must be defined in a way it receives the same parameters in the same order as batching function, includingmgp.ProcCtx
if it's defined as the first parameter- when calling the procedure from the query, you need to call the
batching
function - Memgraph calls
initialization
before thebatching
function batching
function needs to return an empty result at some point, which signals the end of the streamcleanup
function is called at the end of the stream
There is no decorator used to register batched read procedure, so use the mgp
function mgp.add_batch_write_proc('batch', 'init', 'cleanup')
mysql_dict = {}
def init_migrate(
ctx: mgp.ProcCtx,
table: str,
config: mgp.Map,
):
global mysql_dict
query = f"SELECT * FROM {table};"
mysql_dict = {}
if mysql_dict is None:
connection = mysql_connector.connect(**config)
cursor = connection.cursor(buffered=True)
cursor.execute(query)
mysql_dict["connection"] = connection
mysql_dict["cursor"] = cursor
mysql_dict["column_names"] = [column[0] for column in cursor.description]
def migrate(
ctx: mgp.ProcCtx,
table_or_sql: str,
config: mgp.Map,
) -> mgp.Record(vertex=mgp.Vertex):
global mysql_dict
cursor = mysql_dict["cursor"]
column_names = mysql_dict["column_names"]
rows = cursor.fetchmany(1000)
results = []
for row in rows:
# For every row from database, create vertex
# and add properties from database
v=ctx.graph.create_vertex()
for key,value in _name_row_cells(row, column_names):
v.properties.set(key,value)
results.append(mgp.Record(vertex=v))
return results
def cleanup_migrate():
global mysql_dict
mysql_dict["cursor"] = None
mysql_dict["connection"].close()
mysql_dict["connection"].commit()
mysql_dict["connection"] = None
mysql_dict["column_names"] = None
mysql_dict = None
mgp.add_batch_write_proc(migrate, init_migrate, cleanup_migrate)
Magic functions
User-defined, or so-called "Memgraph Magic functions" are implemented similarly to read and write procedures. The difference between these is the end use-case and graph mutability. Users should not modify (create, delete, or update) any graph objects through functions.
Semantically, functions should be small fragments of functionality that do not require long computations and large memory consumption.
The example of how to create and run a function is written below. This example shows one trivial use-case of fetching the arguments as a list of returning values.
@mgp.function
def func_example(context: mgp.FuncCtx,
argument: mgp.Any,
opt_argument: mgp.Nullable[mgp.Any] = None):
return_arguments = [argument]
if opt_argument is not None:
return_arguments.append(opt_argument)
# Note that we do not need to specify the result Record as long as it is a
# Memgraph defined value type.
return return_arguments
At first glance, there is a huge similarity between defining a function and a
procedure. Let's talk about differences. The first difference is the context
type. FuncCtx
prevents you to modify the graph and does not offer the API to
communicate with the graph entities not related to the entry arguments.
The second difference is the resulting signature. Functions do not require the
user to provide a resulting signature because of the return value. A function
call can be nested in Cypher and therefore the only requirement for the
returning value is to be of a supported mgp.Type
.
The Cypher call for the written custom function can be executed like this:
RETURN py_example.func_example("First argument", "Second argument");
This call can also be nested and used as a preprocessing for some other function or procedure. The example of how to combine a built-in function with the currently developed one looks like this:
RETURN head(py_example.func_example("First argument", "Second argument"));
Python API provided by Memgraph can be a very powerful tool for implementing
query modules. We strongly suggest you thoroughly inspect the mgp.py
source
file located in the Memgraph installation directory
/usr/lib/memgraph/python_support
.
Do not store any graph elements globally when writing custom query modules with the intent to use them in a different procedure invocation.
Terminate procedure execution
Just as the execution of a Cypher query can be terminated with TERMINATE TRANSACTIONS "id";
query,
the execution of the procedure can as well, if it takes too long to yield a
response or gets stuck in an infinite loop due to unpredicted input data.
Transaction ID is visible upon calling the SHOW TRANSACTIONS; query.
In order to be able to terminate the procedure, it has to contain function
ctx.check_must_abort()
which precedes crucial parts of the code, such as
while
and until
loops, or similar points where the procedure might become
costly.
Consider the following example:
import mgp
@mgp.read_proc
def long_query(ctx: mgp.ProcCtx) -> mgp.Record(my_id=int):
id = 1
try:
while True:
if ctx.check_must_abort():
break
id += 1
except mgp.AbortError:
return mgp.Record(my_id=id)
The mgp.AbortError:
ensures that the correct message about termination is sent
to the session where the procedure was originally run.
C API
Query modules can be implemented using the C
API
provided by Memgraph. Such modules need to be compiled to a shared library so
that they can be loaded when Memgraph starts. This means that you can write the
procedures in any programming language that can work with C and be compiled to
the ELF shared library format (.so
).
If the programming language of your choice throws exceptions, these exceptions should never leave the scope of your module! You should have a top-level exception handler that returns an error value and potentially logs the error message. Exceptions that cross the module boundary will cause unexpected issues.
Every single Memgraph installation comes with the example.so
query module
located in the /usr/lib/memgraph/query_modules
directory. It was provided as
an example of a query module written with C API for you to examine and learn
from. The query_module
directory also contains src
directory, with
example.c
file.
Let's take a look at the example.c
file.
#include "mg_procedure.h"
In the first line, we include mg_procedure.h
, which contains declarations of
all functions that can be used to implement a query module procedure. This file
is located in the Memgraph installation directory, under
/usr/include/memgraph
. To compile the module, you will have to pass the
appropriate flags to the compiler, for example, clang
:
clang -Wall -shared -fPIC -I /usr/include/memgraph example.c -o example.so
Query procedures
Next, we have a procedure
function. This function will serve as the callback
for our example.procedure
invocation through Cypher.
static void procedure(const struct mgp_list *args, const struct mgp_graph *graph,
struct mgp_result *result, struct mgp_memory *memory) {
...
}
If this was C++ you'd probably write the function like this:
namespace {
void procedure(const mgp_list *args, const mgp_graph *graph,
mgp_result *result, mgp_memory *memory) {
try {
...
} catch (const std::exception &e) {
// We must not let any exceptions out of our module.
mgp_result_set_error_msg(result, e.what());
return;
}
}
}
The procedure
function receives the list of arguments (args
) passed in the
query. The parameter result
is used to fill in the resulting records of the
procedure. Parameters graph
and memory
are context parameters of the
procedure, and they are used in some parts of the provided C API.
For more information on what exactly is possible with C API, take a look at the
mg_procedure.h
file or the C API reference
guide.
The following line contains the mgp_init_module
function that registers procedures
that can be invoked through Cypher. Even though the example has only one
procedure
, you can register multiple different procedures in a single module.
Procedures are invoked using the CALL <module>.<procedure> ...
syntax. The
<module-name>
will correspond to the name of the shared library. Since we
compile our example into example.so
, then the module is called example
.
Procedure names can be different than their corresponding implementation
callbacks because the procedure name is defined when registering a procedure.
int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
// Register our `procedure` as a read procedure with the name "procedure".
struct mgp_proc *proc =
mgp_module_add_read_procedure(module, "procedure", procedure);
// Return non-zero on error.
if (!proc) return 1;
// Additional code for better specifying the procedure (omitted here).
...
// Return 0 to indicate success.
return 0;
}
The omitted part specifies the signature of the registered procedure. The
signature specification states what kind of arguments a procedure accepts and
what will be the resulting set of the procedure. For information on signature
specification API, take a look at mg_procedure.h
file and read the
documentation on functions prefixed with mgp_proc_
.
The passed in memory
argument is only alive throughout the execution of
mgp_init_module
, so you must not allocate any global resources with it. If you
really need to set up a certain global state, you may do so in the
mgp_init_module
using the standard global allocators.
Consequently, you may want to reset any global state or release global resources in the following function.
int mgp_shutdown_module() {
// Return 0 to indicate success.
return 0;
}
As mentioned before, no exceptions should leave your module. If you are writing
the module in a language that throws them, use exception handlers
in mgp_init_module
and mgp_shutdown_module
as well.
Batched query procedures
Similar to batched query procedures in Python, you can add batched query procedures in C.
Batched procedures need 3 functions, one for each of batching, initialization, and cleanup.
static void batch(const struct mgp_list *args, const struct mgp_graph *graph,
struct mgp_result *result, struct mgp_memory *memory) {
...
}
static void init(const struct mgp_list *args, const struct mgp_graph *graph,
struct mgp_memory *memory) {
...
}
static void cleanup() {
...
}
The batch
function receives a list of arguments (args
) passed in the
query. The parameter result
is used to fill in the resulting records of the
procedure. Parameters graph
and memory
are context parameters of the
procedure, and they are used in some parts of the provided C API.
At some point, batch
needs to return an empty result
to signal that the batch
procedure is done with execution and cleanup
can be called. init
doesn't receive result
as it is only used for initialization. init
function will receive same arguments which are registered and passed to the batch
function.
Memgraph ensures to call init
before the batch
function and cleanup
at the end. The user directly invokes the batch
function through OpenCypher.
The argument passed in memory
is only alive throughout the execution of
mgp_init_module
, so you must not allocate any global resources with it. Consequently, you may want to reset any global state or release global resources
in the cleanup
function.
For more information on what exactly is possible with C API, take a look at the
mg_procedure.h
file or the C API reference
guide.
The following line contains the mgp_init_module
function that registers procedures
that can be invoked through Cypher. Even though the example has only one
procedure
, you can register multiple different procedures in a single module.
Batch procedures are invoked using the CALL <module>.<batch_procedure> ...
syntax. The
<module-name>
will correspond to the name of the shared library. Since the example is complied into example.so
, the module is called example
.
As mentioned, Memgraph ensures to call init
before <batch_procedure>
and cleanup
once <batch_procedure>
signals end with an empty result.
int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
// Register our `procedure` as a read procedure with the name "procedure".
struct mgp_proc *proc =
mgp_module_add_batch_read_procedure(module, "procedure", batched, init, cleanup);
// Return non-zero on error.
if (!proc) return 1;
// Additional code for better specifying the procedure (omitted here).
...
// Return 0 to indicate success.
return 0;
}
Magic functions
A major part of defining the "Magic function" is similar to query procedures. The steps of defining a callback and registering arguments are repeated in the magic functions, only with a different syntax.
To define a function, the first step is to define a callback. The example only shows C++ code.
namespace {
void function(const mgp_list *args, mgp_func_context *func_ctx,
mgp_func_result *result, mgp_memory *memory) {
try {
...
} catch (const std::exception &e) {
// We must not let any exceptions out of our module.
mgp_func_result_set_error_msg(result, e.what(), memory);
return;
}
}
}
The parameter args
is used to fetch the required and optional arguments from
the Cypher call. The parameter result
defines the resulting value. It can
carry either an error or a return value, depending on the runtime execution.
There is no mgp_graph
argument because the graph is immutable in functions.
To initialize and register the written function as a magic function, one should
write the initialization in the mgp_init_module
. The registered function can
then be called in similar fashion as the built-in functions, just with the
syntax defining the module it is stored in: <module>.<function_name>(...)
.
int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
// Register our `function` as a Magic function with the name "function".
struct mgp_func *func =
mgp_module_add_function(module, "function", function); // Above defined function pointer
// Return non-zero on error.
if (!func) return 1;
// Additional code for better specifying the function with arguments (omitted here).
...
// Return 0 to indicate success.
return 0;
}