Data migrationBest practices

Import best practices

In most cases, the shortest path to import data into Memgraph is from a CSV file using the LOAD CSV clause. This is the best approach, regardless of whether you are migrating from a relational or a graph database.

Memgraph uses Cypher as the query language, so if the database you are replacing with Memgraph allows you to export a set of Cypher queries in a file, you can use the file to import data directly into Memgraph. To achieve the best performance in terms of speed, execute queries programmatically from a client. This approach is also useful when integrating Memgraph into a much larger system where you also need to implement an import process.

The following best practices are ideal for in-memory storage modes. For the best throughput in on-disk storage mode, use the edge import mode. Switching from an in-memory to the on-disk storage mode is only allowed if the database is empty.

Power up your import

First, create indexes

In Memgraph, indexes are not created in advance and creating constraints does not imply index creation. By creating label or label-property indexes, relationships between nodes will be created much faster and consequently speed up data import. A good rule of thumb is to create indexes for the label or label and property being matched in Cypher queries. Here are the example Cypher queries:

CREATE (:Person {id: 1});
CREATE (:Person {id: 2});
 
MATCH (p1:Person {id: 1}), (p2:Person {id:2})
CREATE (p1)-[:IS_FRIENDS_WITH]->(p2);

Based on the above queries, it makes sense to create a label-property index :Person(id) for faster matching and relationship creation. The best practice is to create and use an index on properties containing many distinct (ideally unique) integer values, such as identification numbers. Choosing the right data to create indexes on is important, as indexing all the content will not improve the database speed. Indexing won’t bring any improvements on a low-cardinality property such as gender.

Be aware that creating a label-property index will not create a label index. If there is a need to optimize queries that fetch nodes by label, create a label index, too. A transaction which creates index or constraint will prevent other transactions from running, because such transactions take a unique lock for the whole duration of the transaction.

There are some downsides to indexing too. Each index requires extra storage (takes up RAM memory), which can greatly impact a large dataset. Besides that, indexes slow down write operations to the database because the structures in the index are dynamically updated on modifications or insertions of new nodes. That means you should avoid creating indexes on properties that are being updated frequently.

Use in-memory analytical storage mode

Memgraph supports three storage modes: IN_MEMORY_TRANSACTIONAL, IN_MEMORY_ANALYTICAL and ON_DISK_TRANSACTIONAL To speed up import, use IN_MEMORY_ANALYTICAL storage mode.

Data import is faster in IN_MEMORY_ANALYTICAL storage mode because Delta objects are disabled there, which ensures there is no memory overhead. The Delta objects track all changes, and that requires a lot of memory in case of frequent updates. Not having Deltas when importing data, speeds up the process significantly.

The Delta objects are crucial to ensure atomicity, consistency, isolation and durability. If you are following instructions from LOAD CSV and Cypher queries best practices, your import process does not need to depend on ACID properties, and it is safe to use IN_MEMORY_ANALYTICAL storage mode, even in the concurrent import scenarios.

Once you’re done with the data import process, if you have write-enabled workloads you should consider switching back to the IN_MEMORY_TRANSACTIONAL mode, which favors strongly-consistent ACID transactions. On a larger scale import, if you stick with the default IN_MEMORY_TRANSACTIONAL storage mode, you’ll experience slower import time and spend more resources during data import because of Delta objects.

LOAD CSV best practices

The LOAD CSV clause best performs when run in batches in parallel in IN_MEMORY_ANALYTICAL storage mode with properly set indexes. If you have data up to a few millions of nodes and relationships, it is good enough to run a simple LOAD CSV query without the need to batch and parallelize it, resulting in import via single thread/core on your machine. When migrating from a relational database, export the tables as CSV files, model the graph and create LOAD CSV queries. When migrating from a graph database, you can also export the graph into CSV files and run LOAD CSV queries to import the data.

Batching the CSV file enables you to run multiple LOAD CSV commands in parallel, which utilizes hardware cores. You should consider batching the CSV files if you have more than 10 million graph objects.

You need to store nodes and relationships in separate CSV files. The preparation work for batching includes splitting those files into smaller CSV files. The size of each CSV file (batch) depends on your hardware specifications and the total amount of data you are trying to import.

From the data perspective, if you have 100 milion nodes, it make sense to split the nodes into batches of 10 or 5 milion nodes per batch. You want to avoid having too large files, since loading a single file with 100 milion nodes can create significant memory overhead in multiple scenarios.

From the hardware perspective, if you have a 32-core CPU, it makes sense to split CSV file into 32 or more batches to fully utilize your machine. If you have fewer files, you won’t put all threads to work, and consequently, you won’t achieve the best import speed.

To use CSV batching, Memgraph needs to be in the IN_MEMORY_ANALYTICAL storage mode, since conflicting transactions error will interrupt the process otherwise. If you are running Memgraph in the IN_MEMORY_TRANSACTIONAL mode, you will be able to utilize just a single core, and the import process will be much slower and it will use more memory because of Delta object explanied in section above.

Example of batched parallel import with LOAD CSV in Python

Assuming that the files have been correctly split, and Memgraph is running in IN_MEMORY_ANALYTICAL mode, the import process can start. If you’re running Memgraph with Docker, you need to first copy the CSV files into the container where Memgraph is runing. Based on the paths of the files, you can generate the queries, each using a different file from the total pool of files.

target_nodes_directory = Path(__file__).parents[3].joinpath(f"datasets/graph500/{size}/csv_node_chunks")
for file in target_nodes_directory.glob("*.csv"):
    subprocess.run(["docker", "cp", str(file), f"memgraph:/usr/lib/memgraph/{file.name}"], check=True)
 
queries = []
for file in target_nodes_directory.glob("*.csv"):
    queries.append(f"LOAD CSV FROM '/usr/lib/memgraph/{file.name}' WITH HEADER AS row CREATE (n:Node {{id: row.id}})")

Once the files and queries are in place, you can start with the concurrent query execution using multiprocessing. By running the query in separate processes and opening a new connection to the database in each process, you are running LOAD CSV in parallel. Keep in mind that the number of processes running depends on your hardware availability. Generally, it should be close to the number of threads on the machine. Going above that number will not bring any perormance improvents, and it can even slow down the import process.

Here is the code that spans ten different processes, each running a separate CSV file via separate sessions and in parallel:

with multiprocessing.Pool(10) as pool:
    pool.starmap(execute_csv_chunk, [(q, ) for q in queries])
 
 
#Rest of the code...
 
def execute_csv_chunk(query):
    try:
        driver = GraphDatabase.driver(HOST_PORT, auth=("", ""))
        with driver.session() as session:
            session.run(query)
    except:
        print("Failed to execute transaction")
        raise e

Multiprocessing, rather than multithreading, was used in the example above because of the Global Interpreter Lock (GIL) in Python. The example above can import one million nodes or relationships per second, depending on the hardware, graph structure and the number of node or relationship properties.

The important factor is that all nodes need to be imported first. After the last file with nodes is imported, the relationships can be imported in the same way.

When to use CSV file import tool?

In Memgraph Lab, there is an option of effortlessly importing your data using the CSV file import tool. This tool allows you to import your data by configuring schema, nodes and relationships with only few clicks.

Users with a smaller dataset (typically files up to 10MB in size) or those less familiar with Cypher queries, will benefit the most from leveraging the CSV file import tool. However, for optimal speed and performance, it’s still recommended to use the LOAD CSV Cypher clause. Using the LOAD CSV Cypher clause eliminates the additional steps in uploading files to Memgraph and communicating with the database.

Users with a larger dataset can still make use of the CSV import tool by configuring their dataset and utilizing the code generated by the tool. Simply copy and run the generated code directly in the Query execution window, enhancing your overall database performance.

Cypher queries best practices

If large dataset files are not in the CSV format, any type of dataset file can be easily transformed into Cypher queries that can create the graph based on the dataset files. It is the most flexible way to import any type of data into Memgraph, but it requires a bit of work.

To start, the dataset file needs to be read in the programming language of choice, and they should be split into the nodes and edges files.

Let’s assume the dataset file describing relationships looks like the following:

// Relationships.txt
2 1
3 1
4 1
6 1
29 1
43 1
44 1
51 1
53 1
87 1

Each line describes the two node ids that represent an adjacent relationship type between them.

The key is to transform the chunk of relationships into a list that will be passed to a single query as an argument. Here is an example:

#Code for reading the lines omitted 
create_relationships.append({"a": int(node_source), "b": int(node_sink)})
if len(create_relationships) == CHUNK_SIZE:
    print("Chunk size reached - adding to chunks ...", len(create_relationships))
    chunks.append(create_relationships)
    create_relationships = []

The big file of relationships is being chunked into the list between 10K and 100K elements. The exact number of elements should depend on available hardware.

After that, each relationship pair is in the list and can be expanded into the query with UNWIND command:

query = """
    WITH $batch AS nodes
    UNWIND nodes AS node
    MATCH (a:Node {id: node.a}), (b:Node {id: node.b}) CREATE (a)-[:RELATIONSHIP]->(b)
    """

This part is important since you run a single query and transaction to create 10K or 100K relationships. Running a batch of CREATE statements in a single transaction will be much slower than using UNWIND, and it is even worse if you run a single transaction for each CREATE statement.

Each chunk of relationships, in this case, can then be run in parallel. To do that, you should run each chunk as a separate process. Here is an example:

with multiprocessing.Pool(10) as pool:
    pool.starmap(process_chunk, [(query, chunk) for chunk in chunks])

Multiprocessing, rather than multithreading, was used in the example above because of the Global Interpreter Lock (GIL) in Python.

Depending on the storage mode in which Memgraph is running, the following steps will differ a bit. The fastest and least resource-intensive way is by using the in-memory analytical storage mode. It is also recommended for running an import via Cypher.

If you are running Cypher import in IN_MEMORY_ANALYTICAL mode, your function for executing chunk of nodes or relationships looks like following:

def process_chunk_managed_API(query, create_list):
    driver = GraphDatabase.driver(HOST_PORT, auth=("", ""))
    with driver.session(max_transaction_retry_time=180.0, initial_retry_delay=0.2, retry_delay_multiplier=1.1, retry_delay_jitter_factor=0.1) as session:
        session.execute_write(lambda tx: tx.run(query, {"batch": create_list}))
    driver.close()

Notice the individual batch being passed to the query.

If you are running Cypher import in the IN_MEMORY_ANALYTICAL mode and using Memgraph Python driver (pymgclient or GQLAlchemy), the execution of the node queries won’t differ compared to the process_chunk described above. On the other side, if Memgraph is in IN_MEMORY_TRANSACTIONAL mode, the execution of relationship creation queries can run into conflicting transactions error.

If you have a file with Cypher queries from another graph database, you can tweak it to fit Memgraph’s Cypher syntax and import it into Memgraph.

How to handle conflicting transactions error

If you’re importing data concurrently in the IN_MEMORY_TRANSACTIONAL storage mode, you can expect Cannot resolve conflicting transactions error to occur. This happens because your code tries to concurrently update the same graph object which is locked while being created or updated. This error is expected and it happens because of ACID guarantees in the IN_MEMORY_TRANSACTIONAL storage.

The dataset structure will highly influence the probability of having conflicting transactions during relationships import. Since nodes are separate entities, this error won’t appear during node import. On the other hand, each relationship connects two nodes, so if you have a supernode (a node connected to many other nodes in the dataset), parallel relationship writing on the supernode will cause conflicts. The more connected the node is, the more serial the performance will be with more conflicts.

There are two most common approaches to handle conflicting transactions error:

1. Switch to in-memory analytical storage mode

You won’t experience conflicting transactions error in the IN_MEMORY_ANALYTICAL storage mode. Still, it is important you ensure that there are no concurrent transactions updating the same graph object property. During import, you can ensure this by first importing nodes and then relationships. On node import it is ideal to have CREATE queries so if you run such queries concurrently, none of them updates the same graph object property. Adding relationships to a relationships vector inside the node object is a thread safe operation, so there are no concerns as well, but just make sure you don’t update the property of the same relationship concurrently. It is important not to mix concurrent import of nodes and relationships in this storage mode. If you do that, a relationship can end up not being created, because its start and end nodes which you’re trying to match might not yet be created.

2. Catch the error and retry the transaction with backoff

If you can’t switch to IN_MEMORY_ANALYTICAL storage mode and can’t avoid concurrent updates to the same graph objects, then you’ll have to retry the transactions which throw the conflicting transactions error.

def process_chunk(query, create_list, max_retries=100, initial_wait_time=0.200, backoff_factor=1.1, jitter=0.1):
    session = GraphDatabase.driver(HOST_PORT, auth=("", "")).session()
    for attempt in range(max_retries):
        try:
            with session.begin_transaction() as tx:
                tx.run(query, {"batch": create_list})
                tx.commit()
                break
        except TransientError as te:
            jitter = random.uniform(0, jitter) * initial_wait_time 
            wait_time = initial_wait_time * (backoff_factor ** attempt) + jitter
            print(f"Commit failed on attempt {attempt+1}. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
        except Exception as e:
            print(f"Failed to execute transaction: {e}")
            session.close()
            raise e

The execute needs to be in the try-except block due to possible exceptions from conflicting transactions error. If that error happens, the transaction run should be retried, but with the backoff_factor starting based on the initial_wait_time in seconds.

The important arguments to consider here are the inital_wait_time and backoff_factor. The initial time should not be too long, approximately the duration of the batch commit. The backoff_factor should be small and increase slowly since conflicts can occur frequently. It should never be logarithmic since it will fire up to minutes quickly. The jitter is added to avoid the retries at a similar time.

The code snippet above is using an explicit transaction, which is necessary for handling the conflicts. They can also be handled with the managed transactions. Implicit (autocommit) transactions are not recommended for handing the conflicts.

I have both CSV and Cypher queries. Which should I use?

The LOAD CSV with batches and running on multi-core machine will yield a much better performance and will use less resources then the process of loading via the Cypher queries. The process of loading data should require less effort via LOAD CSV.

On the other hand, if you want to parse and change your data in any way, before it reaches Memgraph, it is much easier to do it in the programming language then to process all CSV files based on some rules. Cypher provides a bit more flexibility in expressing what you want, but it comes at the cost of slower performance and more effort.

What to do with other data types?

Other common data types are also used to migrate data from one system to another, such as JSON, Parquet, ORC or IPC/Feather/Arrow files. Memgraph offers out-of-the-box solutions for those types of data, but for the fastest import, using the LOAD CSV clause or Cypher queries is still recommended. Besides storing data in files, you might also have data coming in from a streaming data platform, such as Kafka, RedPanda or Pulsar. In that case, Memgraph has built-in consumers to ingest the data properly and uses procedures called transformations to convert incoming data into a Cypher query that will create data in Memgraph.

Do I lose my data if I restart the database?

Although Memgraph is an in-memory database, the data is persistent and durable. This means data will not be lost upon reboot. Memgraph uses two mechanisms to ensure the durability of stored data and make disaster recovery possible:

  • write-ahead logging (WAL)
  • periodic snapshot creation

Each database modification is recorded in a log file before being written to the database. Therefore, the log file contains all steps needed to reconstruct the DB’s most recent state. Memgraph also periodically takes snapshots during runtime to write the entire data storage to the drive. On startup, the database state is recovered from the most recent snapshot file. The timestamp of the snapshot is compared with the latest update recorded in the WAL file and, if the snapshot is less recent, the state of the DB will be completely recovered using the WAL file. If you are using Memgraph with Docker, be sure to specify a volume for data persistence.

These files are stored on disk, which is why you’ll notice memory used on your disk as well. If the disk storage seems too high, that might be because, by default, Memgraph stores the three most recent snapshots, defined with --storage-snapshot-retention-count flag. The snapshots are, by default, created every 300 seconds, meaning that with an extra large dataset (in terabytes of RAM), it is possible that one snapshot is still being created when another one had just started creating. To avoid that, configure --storage-snapshot-interval setting to a large enough period.

How to efficiently delete everything?

Matching nodes and then deleting relationships attached to them can consume a lot of memory in larger datasets (>1M). This is due to the accumulation of Deltas, which store changes to the graph objects.

Periodic execution

To efficiently keep the memory low during query execution for a single query, refer to periodic execution.

Efficiently deleting everything manually

To efficiently drop the database, first delete all relationships and then all nodes. To delete the relationships, execute the query below repeatedly until the number of deleted relationships is 100,000.

MATCH ()-[r]->()
WITH r
LIMIT 100000
DELETE r
RETURN count(r) AS num_deleted;

After deleting all relationships, run the following query repeatedly until the number of deleted nodes is 100,000 to delete all nodes:

MATCH (n)
WITH n
LIMIT 100000
DELETE n
RETURN count(n) AS num_deleted;

If the deletion still consumes too much memory, consider lowering the batch size limit. If you notice memory still being to high, it can be due to the high value of the --storage-gc-cycle-sec flag. That means that Memgraph’s garbage collector potentially still didn’t deallocate unused objects and free the memory. You can free up memory by running the FREE MEMORY query.

Monitoring memory during import

Often times, the user needs to batch transactions in the IN_MEMORY_TRANSACTIONAL storage mode. This is due to large-update transactions that are creating a lot of Delta objects during the transaction lifetime. Delta objects are necessary for reverting the query if an error during query execution happens. They are approximately of 56 bytes in size, but large-update transactions can make the number of delta objects be huge - therefore resulting in Memgraph getting out of memory.

We have exposed the peak_memory_res (peak resident memory) variable in the SHOW STORAGE INFO; command. with which you can monitor when the peak resident memory rises in the system. It will help you diagnose bottlenecks and high-memory queries which can be optimized. That is most common in import queries because users would want to import the whole dataset with one command only.

For more information, check our storage memory usage. For more information about Delta objects, check the information on the IN_MEMORY_TRANSACTIONAL storage mode.