elasticsearch_synchronization
Elasticsearch is a text-processing platform that can be used to enhance the capabilities of a graph database like Memgraph. It offers many fine-grained features useful when working on a text that is impossible to develop in databases. Data residing in Elasticsearch and Memgraph should be synchronized because otherwise, the whole system could be in an inconsistent state. Such a feature can be added inside Memgraph by using triggers: every time a new entity is added (node or edge) it gets indexed to the Elasticsearch index.
The module supports the following features:
- creating Elasticsearch index from Memgraph clients using Cypher
- indexing all data inside Memgraph to Elasticsearch indexes
- managing Elasticsearch authentication in a secure way
- indexing entities (nodes and relationships) as they are being inserted into the database without reindexing old data
- scanning and searching documents from Elasticsearch indexes using Query DSL
- reindexing existing documents from Elasticsearch
When using Elasticsearch synchronization modules:
- Start Elasticsearch instance and securely store username, password, path to the certificate file and instance's URL.
- Connect to the instance by calling the
connect
procedure. - Use the
create_index
procedure to create Elasticsearch indexes for nodes and relationships. - Index all entities inside the database using the
index_db
procedure. - Check that documents were indexed correctly using the
scan
orsearch
procedure.
The module for synchronizing Elasticsearch with Memgraph is organized as a stateful module where it is expected that the user performs a sequence of operations using a managed secure connection to Elasticsearch. The user can use indexes that already exist inside Elasticsearch but can also choose to create new ones with custom schema. Indexing can be performed in two ways:
- Index all data residing inside the database.
- Incrementally index entities as they get inserted into the database by using triggers which execute a specific procedure upon some event.
Trait | Value |
---|---|
Module type | module |
Implementation | Python |
Parallelism | sequential |
Procedures
You can execute this algorithm on graph projections, subgraphs or portions of the graph.
connect()
The connect()
procedure is used to connect to the Elasticsearch instance using
Memgraph. It uses a basic authentication scheme with username, password and
certificate.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
elastic_url: string
-> URL for connecting to the Elasticsearch instance. -
ca_certs: string
-> Path to the certificate file. -
elastic_user: string
-> The user trying to connect to Elasticsearch. -
elastic_password: string
-> User's password for connecting to Elasticsearch.
Output:
connection_status: Dict[string, string]
-> Connection information.
Usage:
Use the procedure to connect to the Elasticsearch instance:
CALL elastic_search_serialization.connect("https://localhost:9200", "~/elasticsearch-8.4.3/config/certs/http_ca.crt", <ELASTIC_USER>, <ELASTIC_PASSWORD>)
YIELD connection_status
RETURN connection_status;
create_index()
The procedure used for creating Elasticsearch indexes.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
index_name: string
-> Name of the index that needs to be created. -
schema_path: string
-> Path to the schema from where the index will be loaded. -
schema_parameters: Dict[string, Any]
number_of_shards: int
-> Number of shards index will use.number_of_replicas: int
-> Number of replicas index will use.analyzer: string
-> Custom analyzer, which can be set to any legal Elasticsearch analyzer.index_type: string
-> Can be"edge"
or"vertex"
.
Output:
message_status: Dict[string, string]
-> Output from the Elasticsearch instance whether the index was successfully created.
Usage:
Use the following query to create Elasticsearch indexes:
CALL elastic_search_serialization.create_index("edge_index",
"edge_index_path_schema.json", {analyzer: "mem_analyzer", index_type: "edge"})
YIELD message_status
RETURN message_status;
index_db()
The procedure is used to serialize all nodes and relationships in Memgraph
database to Elasticsearch instance. By setting the thread_count
,
max_chunk_bytes
, chunk_size
, max_chunk_bytes
and queue_size
parameters,
it's possible to get a good performance spot when indexing large quantities of
documents.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
node_index: string
-> The name of the node index. Can be used for both streaming and parallel bulk. -
edge_index: string
-> The name of the edge index. Can be used for both streaming and parallel bulk. -
thread_count: int
-> Size of the threadpool to use for the bulk requests. -
chunk_size: int (default=500)
-> The number of docs sent to Elasticsearch in one chunk. -
max_chunk_bytes: int (default=104857600)
-> The maximum size of the request in bytes. The default equalls to 100 MB. -
raise_on_error: bool (default=True)
-> RaiseBulkIndexError
containing errors (as .errors) from the execution of the last chunk when some occur. -
raise_on_exception: bool
-> IfFalse
then don’t propagate exceptions from call to bulk and just report the items that failed as failed. -
max_retries: int (default=0)
-> Maximum number of times a document will be retried when a 429 is received. The 0 equalls to no retries on 429. -
initial_backoff: float
-> The number of seconds Elasticsearch should wait before the first retry. Any subsequent retries will be powers ofinitial_backoff * 2**retry_number
-
max_backoff: float
-> The maximum number of seconds a retry will wait. -
yield_ok: float
-> If set toFalse
will skip successful documents in the output. -
queue_size: int
-> Size of the task queue between the main thread (producing chunks to send) and the processing threads.
Output:
number_of_nodes: int
-> Number of indexed nodes.number_of_edges: int
-> Number of indexed relationships.
Usage:
The procedure can be called in a following way:
CALL elastic_search_serialization.index_db("node_index", "edge_index", 5, 256, 104857600, True, False, 2, 2.0, 600.0, True, 2)
YIELD number_of_nodes, number_of_edges
RETURN number_of_nodes, number_of_edges;
index()
The procedure is meant to be used in combination with triggers for incrementally indexing incoming data and it shouldn't be called by a user explicitly.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
createdObjects: List[Dict[string, Object]]
-> Objects that are captured by a create trigger. -
node_index: string
-> The name of the node index. Can be used for both streaming and parallel bulk. -
edge_index: string
-> The name of the edge index. Can be used for both streaming and parallel bulk. -
thread_count: int
-> Size of the threadpool to use for the bulk requests. -
chunk_size: int (default=500)
-> The number of docs sent to Elasticsearch in one chunk. -
max_chunk_bytes: int (default=104857600)
-> The maximum size of the request in bytes. The default equalls to 100 MB. -
raise_on_error: bool (default=True)
-> RaiseBulkIndexError
containing errors (as .errors) from the execution of the last chunk when some occur. -
raise_on_exception: bool
-> IfFalse
then don’t propagate exceptions from call to bulk and just report the items that failed as failed. -
max_retries: int (default=0)
-> Maximum number of times a document will be retried when a 429 is received. The 0 equalls to no retries on 429. -
initial_backoff: float
-> The number of seconds Elastichsearch should wait before the first retry. Any subsequent retries will be powers ofinitial_backoff * 2**retry_number
-
max_backoff: float
-> The maximum number of seconds a retry will wait. -
yield_ok: float
-> If set toFalse
will skip successful documents in the output. -
queue_size: int
-> Size of the task queue between the main thread (producing chunks to send) and the processing threads.
Output:
number_of_nodes: int
-> Number of indexed nodes.number_of_edges: int
-> Number of indexed edges.
Usage:
The procedure can be used in a following way:
CREATE TRIGGER elastic_search_create
ON CREATE AFTER COMMIT EXECUTE
CALL elastic_search_serialization.index(createdObjects, "docs_nodes", "docs_edges")
YIELD number_of_nodes, number_of_edges
RETURN number_of_nodes, number_of_edges;
reindex()
Reindex all documents that satisfy a given query from one index to another,
potentially (if target_client
is specified) on a different cluster. Unleas
specified differently, all documents will be reindexed.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
updatatedObjects: List[Dict[string, Any]]
-> List of all objects that were updated and then sent as arguments to this procedure with the help of the update trigger. -
source_index: Union[string, List[string]])
-> Identifies source index (or more of them) from where documents need to be indexed. -
target_index: string
-> Identifies target index to where documents need to be indexed. -
query: string
-> Query written as JSON. -
chunk_size: int (default=500)
-> The number of docs sent to Elasticsearch in one chunk. -
scroll: string
-> Specifies how long a consistent view of the index should be maintained for scrolled search. -
op_type: Optional[string])
-> Explicit operation type. Defaults to_index
. Data streams must be set tocreate
. If not specified, will auto-detect iftarget_index
is a data stream.
Output:
response: string
-> Number of documents matched by a query in thesource_index
.
Usage:
To reindex all documents from the source_index
to the destination_index
, use the following query:
CALL elastic_search_serialization.reindex("source_index", "destination_index", "{\"query\": {\"match_all\": {}}} ")
YIELD response
RETURN response;
scan()
Fetches documents from the index specified by the index_name
matched by the
query. Supports pagination.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
index_name: string
-> Name of the index. -
query: string
-> Query written as JSON. -
scroll: int
-> Specifies how long a consistent view of the index should be maintained for scrolled search. -
raise_on_error: bool (default=True)
-> Raises an exception (ScanError
) if an error is encountered (some shards fail to execute). -
preserve_order: bool
-> By defaultscan()
does not return results in any pre-determined order. To have a standard order in the returned documents (either by score or explicit sort definition) when scrolling, setpreserve_order=True
. Don’t set thesearch_type
to scan - this will cause the scroll to paginate with preserving the order. Note that this can be an extremely expensive operation and can easily lead to unpredictable results, use it with caution. -
size: int
-> Size (per shard) of the batch sent at each iteration. -
from: int
-> Starting document offset. By default, you cannot page through more than 10,000 hits using thefrom
and size parameters. To page through more hits, use thesearch_after
parameter. -
request_timeout: mgp.Nullable[float]
-> Explicit timeout for each call to scan. -
clear_scroll: bool (default=True)
-> Explicitly calls delete on the scroll ID via the clear scroll API at the end of the procedure on completion or error.
Output:
documents: List[Dict[string, string]]
-> List of all items matched by the specific query.
Usage:
Below is an example scan query that makes use of all parameters:
CALL elastic_search_serialization.scan("edge_index", "{\"query\": {\"match_all\": {}}}", "5m", false, false, 100, 0, 2.0, False)
YIELD documents
RETURN documents;
search()
Searches for all documents by specifying query and index. It is the preferred
procedure to be used before the scan()
procedure because of the possibility to use
aggregations.
Input:
-
subgraph: Graph
(OPTIONAL) ➡ A specific subgraph, which is an object of type Graph returned by theproject()
function, on which the algorithm is run. If subgraph is not specified, the algorithm is computed on the entire graph by default. -
index_name: string
-> A name of the index. -
query: string
-> Query written as JSON. -
size: int
-> Size (per shard) of the batch sent at each iteration. -
from_: int
-> Starting document offset. By default, you cannot page through more than 10,000 hits using thefrom
and size parameters. To page through more hits, use thesearch_after
parameter. -
aggregations: Optional[Mapping[string, Mapping[string, Any]]]
-> Check the (Elasticsearch documentation)[https://elasticsearch-py.readthedocs.io/en/v8.5.3/api.html#elasticsearch.Elasticsearch.search (opens in a new tab)]. -
aggs: Optional[Mapping[string, Mapping[string, Any]]]
-> Check the (Elasticsearch documentation)[https://elasticsearch-py.readthedocs.io/en/v8.5.3/api.html#elasticsearch.Elasticsearch.search (opens in a new tab)].
Output:
documents: Dict[string, string]
→ Returns results matching a query.
Usage:
A query without aggregations that represents how the search procedure could be used:
CALL elastic_search_serialization.search("node_index", "{\"match_all\": {}}", 1000, 0)
YIELD documents
RETURN documents;
Example
Example shows all module's features, from connecting to the Elasticsearch instance to the synchronizing Memgraph and Elasticsearch using triggers.
Connect and populate
Use the following query to connect to the Elasticsearch instance and populate the database:
CALL elastic_search_serialization.connect("https://localhost:9200", "http_ca.crt", "<ELASTIC_USER>","<ELASTIC_PASSWORD>")
YIELD connection_status
RETURN connection_status;
...queries used to populate...
Create indexes
Use the following queries to create Elasticsearch indexes.
CALL elastic_search_serialization.create_index("docs_nodes", "node_index_path_schema.json",
{analyzer: "mem_analyzer", index_type: "vertex"})
YIELD message_status
RETURN message_status;
CALL elastic_search_serialization.create_index("docs_edges", "edge_index_path_schema.json", {analyzer: "mem_analyzer", index_type: "edge"})
YIELD message_status
RETURN message_status;
Index database
Serialize all nodes and relationships in Memgraph database to Elasticsearch instance.
CALL elastic_search_serialization.index_db("docs_nodes", "docs_edges", 4)
YIELD number_of_nodes, number_of_edges
RETURN number_of_nodes, number_of_edges;
Scan
Fetch documents from the index specified by the index_name
matched by the
query:
CALL elastic_search_serialization.scan("docs_nodes", "{\"query\": {\"match_all\": {}}}", "5m", false, false, 100, 0, 2.0, False)
YIELD documents
RETURN documents;
The scan produces the following results:
Create a trigger
Use the following query to create a trigger that will create indexes when new data is created:
CREATE TRIGGER elastic_search_create
ON CREATE AFTER COMMIT EXECUTE
CALL elastic_search_serialization.index(createdObjects, "docs_nodes", "docs_edges")
YIELD message_status
RETURN message_status;
Insert a new node and relationship
Use the following query to insert a node and a relationship which will trigger the execution of a procedure:
CREATE (n7 {name: "n7"});
MATCH (n6 {name: "n6"}), (n7 {name: "n7"})
CREATE (n6)-[:NEW_CONNECTION {edge_property: "docs"}]->(n7);
Search
CALL elastic_search_serialization.search("docs_nodes", "{\"match_all\": {}}", 1000, 0)
YIELD documents
RETURN documents;
The search will produce the following results: