elasticsearch_synchronization

elasticsearch_synchronization

Elasticsearch is a text-processing platform that can be used to enhance the capabilities of a graph database like Memgraph. It offers many fine-grained features useful when working on a text that is impossible to develop in databases. Data residing in Elasticsearch and Memgraph should be synchronized because otherwise, the whole system could be in an inconsistent state. Such a feature can be added inside Memgraph by using triggers: every time a new entity is added (node or edge) it gets indexed to the Elasticsearch index.

The module supports the following features:

  • creating Elasticsearch index from Memgraph clients using Cypher
  • indexing all data inside Memgraph to Elasticsearch indexes
  • managing Elasticsearch authentication in a secure way
  • indexing entities (nodes and relationships) as they are being inserted into the database without reindexing old data
  • scanning and searching documents from Elasticsearch indexes using Query DSL
  • reindexing existing documents from Elasticsearch

When using Elasticsearch synchronization modules:

  1. Start Elasticsearch instance and securely store username, password, path to the certificate file and instance's URL.
  2. Connect to the instance by calling the connect procedure.
  3. Use the create_index procedure to create Elasticsearch indexes for nodes and relationships.
  4. Index all entities inside the database using the index_db procedure.
  5. Check that documents were indexed correctly using the scan or search procedure.

The module for synchronizing Elasticsearch with Memgraph is organized as a stateful module where it is expected that the user performs a sequence of operations using a managed secure connection to Elasticsearch. The user can use indexes that already exist inside Elasticsearch but can also choose to create new ones with custom schema. Indexing can be performed in two ways:

  1. Index all data residing inside the database.
  2. Incrementally index entities as they get inserted into the database by using triggers which execute a specific procedure upon some event.
TraitValue
Module typemodule
ImplementationPython
Parallelismsequential

Procedures

connect()

The connect() procedure is used to connect to the Elasticsearch instance using Memgraph. It uses a basic authentication scheme with username, password and certificate.

Input:

  • elastic_url: string -> URL for connecting to the Elasticsearch instance.
  • ca_certs: string -> Path to the certificate file.
  • elastic_user: string -> The user trying to connect to Elasticsearch.
  • elastic_password: string -> User's password for connecting to Elasticsearch.

Output:

  • connection_status: Dict[string, string] -> Connection information.

Usage:

Use the procedure to connect to the Elasticsearch instance:

CALL elastic_search_serialization.connect("https://localhost:9200", "~/elasticsearch-8.4.3/config/certs/http_ca.crt", <ELASTIC_USER>, <ELASTIC_PASSWORD>)
YIELD connection_status
RETURN connection_status;

create_index()

The procedure used for creating Elasticsearch indexes.

Input:

  • index_name: string -> Name of the index that needs to be created.
  • schema_path: string -> Path to the schema from where the index will be loaded.
  • schema_parameters: Dict[string, Any]
    • number_of_shards: int -> Number of shards index will use.
    • number_of_replicas: int -> Number of replicas index will use.
    • analyzer: string -> Custom analyzer, which can be set to any legal Elasticsearch analyzer.
    • index_type: string -> Can be "edge" or "vertex".

Output:

  • message_status: Dict[string, string] -> Output from the Elasticsearch instance whether the index was successfully created.

Usage:

Use the following query to create Elasticsearch indexes:

CALL elastic_search_serialization.create_index("edge_index",
"edge_index_path_schema.json", {analyzer: "mem_analyzer", index_type: "edge"}) 
YIELD message_status
RETURN message_status;

index_db()

The procedure is used to serialize all nodes and relationships in Memgraph database to Elasticsearch instance. By setting the thread_count, max_chunk_bytes, chunk_size, max_chunk_bytes and queue_size parameters, it's possible to get a good performance spot when indexing large quantities of documents.

Input:

  • node_index: string -> The name of the node index. Can be used for both streaming and parallel bulk.
  • edge_index: string -> The name of the edge index. Can be used for both streaming and parallel bulk.
  • thread_count: int -> Size of the threadpool to use for the bulk requests.
  • chunk_size: int (default=500) -> The number of docs sent to Elasticsearch in one chunk.
  • max_chunk_bytes: int (default=104857600) -> The maximum size of the request in bytes. The default equalls to 100 MB.
  • raise_on_error: bool (default=True) -> Raise BulkIndexError containing errors (as .errors) from the execution of the last chunk when some occur.
  • raise_on_exception: bool -> If False then don’t propagate exceptions from call to bulk and just report the items that failed as failed.
  • max_retries: int (default=0) -> Maximum number of times a document will be retried when a 429 is received. The 0 equalls to no retries on 429.
  • initial_backoff: float -> The number of seconds Elasticsearch should wait before the first retry. Any subsequent retries will be powers of initial_backoff * 2**retry_number
  • max_backoff: float -> The maximum number of seconds a retry will wait.
  • yield_ok: float -> If set to False will skip successful documents in the output.
  • queue_size: int -> Size of the task queue between the main thread (producing chunks to send) and the processing threads.

Output:

  • number_of_nodes: int -> Number of indexed nodes.
  • number_of_edges: int -> Number of indexed relationships.

Usage:

The procedure can be called in a following way:

CALL elastic_search_serialization.index_db("node_index", "edge_index", 5, 256, 104857600, True, False, 2, 2.0, 600.0, True, 2) 
YIELD number_of_nodes, number_of_edges
RETURN number_of_nodes, number_of_edges;

index()

The procedure is meant to be used in combination with triggers for incrementally indexing incoming data and it shouldn't be called by a user explicitly.

Input:

  • createdObjects: List[Dict[string, Object]] -> Objects that are captured by a create trigger.
  • node_index: string -> The name of the node index. Can be used for both streaming and parallel bulk.
  • edge_index: string -> The name of the edge index. Can be used for both streaming and parallel bulk.
  • thread_count: int -> Size of the threadpool to use for the bulk requests.
  • chunk_size: int (default=500) -> The number of docs sent to Elasticsearch in one chunk.
  • max_chunk_bytes: int (default=104857600) -> The maximum size of the request in bytes. The default equalls to 100 MB.
  • raise_on_error: bool (default=True) -> Raise BulkIndexError containing errors (as .errors) from the execution of the last chunk when some occur.
  • raise_on_exception: bool -> If False then don’t propagate exceptions from call to bulk and just report the items that failed as failed.
  • max_retries: int (default=0) -> Maximum number of times a document will be retried when a 429 is received. The 0 equalls to no retries on 429.
  • initial_backoff: float -> The number of seconds Elastichsearch should wait before the first retry. Any subsequent retries will be powers of initial_backoff * 2**retry_number
  • max_backoff: float -> The maximum number of seconds a retry will wait.
  • yield_ok: float -> If set to False will skip successful documents in the output.
  • queue_size: int -> Size of the task queue between the main thread (producing chunks to send) and the processing threads.

Output:

  • number_of_nodes: int -> Number of indexed nodes.
  • number_of_edges: int -> Number of indexed edges.

Usage:

The procedure can be used in a following way:

CREATE TRIGGER elastic_search_create
ON CREATE AFTER COMMIT EXECUTE
CALL elastic_search_serialization.index(createdObjects, "docs_nodes", "docs_edges") 
YIELD number_of_nodes, number_of_edges 
RETURN number_of_nodes, number_of_edges;

reindex()

Reindex all documents that satisfy a given query from one index to another, potentially (if target_client is specified) on a different cluster. Unleas specified differently, all documents will be reindexed.

Input:

  • updatatedObjects: List[Dict[string, Any]] -> List of all objects that were updated and then sent as arguments to this procedure with the help of the update trigger.
  • source_index: Union[string, List[string]]) -> Identifies source index (or more of them) from where documents need to be indexed.
  • target_index: string -> Identifies target index to where documents need to be indexed.
  • query: string -> Query written as JSON.
  • chunk_size: int (default=500) -> The number of docs sent to Elasticsearch in one chunk.
  • scroll: string -> Specifies how long a consistent view of the index should be maintained for scrolled search.
  • op_type: Optional[string]) -> Explicit operation type. Defaults to _index. Data streams must be set to create. If not specified, will auto-detect if target_index is a data stream.

Output:

  • response: string -> Number of documents matched by a query in the source_index.

Usage:

To reindex all documents from the source_index to the destination_index, use the following query:

CALL elastic_search_serialization.reindex("source_index", "destination_index", "{\"query\": {\"match_all\": {}}} ") 
YIELD response
RETURN response;

scan()

Fetches documents from the index specified by the index_name matched by the query. Supports pagination.

Input:

  • index_name: string -> Name of the index.
  • query: string -> Query written as JSON.
  • scroll: int -> Specifies how long a consistent view of the index should be maintained for scrolled search.
  • raise_on_error: bool (default=True) -> Raises an exception (ScanError) if an error is encountered (some shards fail to execute).
  • preserve_order: bool -> By default scan() does not return results in any pre-determined order. To have a standard order in the returned documents (either by score or explicit sort definition) when scrolling, set preserve_order=True. Don’t set the search_type to scan - this will cause the scroll to paginate with preserving the order. Note that this can be an extremely expensive operation and can easily lead to unpredictable results, use it with caution.
  • size: int -> Size (per shard) of the batch sent at each iteration.
  • from: int -> Starting document offset. By default, you cannot page through more than 10,000 hits using the from and size parameters. To page through more hits, use the search_after parameter.
  • request_timeout: mgp.Nullable[float] -> Explicit timeout for each call to scan.
  • clear_scroll: bool (default=True) -> Explicitly calls delete on the scroll ID via the clear scroll API at the end of the procedure on completion or error.

Output:

  • documents: List[Dict[string, string]] -> List of all items matched by the specific query.

Usage:

Below is an example scan query that makes use of all parameters:

CALL elastic_search_serialization.scan("edge_index", "{\"query\": {\"match_all\": {}}}", "5m", false, false, 100, 0, 2.0, False) 
YIELD documents
RETURN documents;

search()

Searches for all documents by specifying query and index. It is the preferred procedure to be used before the scan() procedure because of the possibility to use aggregations.

Input:

Output:

  • documents: Dict[string, string] → Returns results matching a query.

Usage:

A query without aggregations that represents how the search procedure could be used:

CALL elastic_search_serialization.search("node_index",  "{\"match_all\": {}}", 1000, 0) 
YIELD documents
RETURN documents;

Example

Example shows all module's features, from connecting to the Elasticsearch instance to the synchronizing Memgraph and Elasticsearch using triggers.

Connect and populate

Use the following query to connect to the Elasticsearch instance and populate the database:

CALL elastic_search_serialization.connect("https://localhost:9200", "http_ca.crt", "<ELASTIC_USER>","<ELASTIC_PASSWORD>") 
YIELD connection_status
RETURN connection_status;
 
...queries used to populate...

Create indexes

Use the following queries to create Elasticsearch indexes.

CALL elastic_search_serialization.create_index("docs_nodes", "node_index_path_schema.json",
{analyzer: "mem_analyzer", index_type: "vertex"}) 
YIELD message_status
RETURN message_status;
CALL elastic_search_serialization.create_index("docs_edges",  "edge_index_path_schema.json", {analyzer: "mem_analyzer", index_type: "edge"}) 
YIELD message_status
RETURN message_status;

Index database

Serialize all nodes and relationships in Memgraph database to Elasticsearch instance.

CALL elastic_search_serialization.index_db("docs_nodes", "docs_edges", 4)
YIELD number_of_nodes, number_of_edges
RETURN number_of_nodes, number_of_edges;

Scan

Fetch documents from the index specified by the index_name matched by the query:

CALL elastic_search_serialization.scan("docs_nodes", "{\"query\": {\"match_all\": {}}}", "5m", false, false, 100, 0, 2.0, False)
YIELD documents
RETURN documents;

The scan produces the following results:

Create a trigger

Use the following query to create a trigger that will create indexes when new data is created:

CREATE TRIGGER elastic_search_create
ON CREATE AFTER COMMIT EXECUTE
CALL elastic_search_serialization.index(createdObjects, "docs_nodes", "docs_edges")
YIELD message_status
RETURN message_status;

Insert a new node and relationship

Use the following query to insert a node and a relationship which will trigger the execution of a procedure:

CREATE (n7 {name: "n7"});
MATCH (n6 {name: "n6"}), (n7 {name: "n7"})
CREATE (n6)-[:NEW_CONNECTION {edge_property: "docs"}]->(n7);

Search

CALL elastic_search_serialization.search("docs_nodes",  "{\"match_all\": {}}", 1000, 0)
YIELD documents
RETURN documents;

The search will produce the following results: