Import best practices
In most cases, the shortest path to import data into Memgraph is from a CSV file using the LOAD CSV clause. This is the best approach, regardless of whether you are migrating from a relational or a graph database.
Memgraph uses Cypher as the query language, so if the database you are replacing with Memgraph allows you to export a set of Cypher queries in a file, you can use the file to import data directly into Memgraph. To achieve the best performance in terms of speed, execute queries programmatically from a client. This approach is also useful when integrating Memgraph into a much larger system where you also need to implement an import process.
The following best practices are ideal for in-memory storage modes. For the best throughput in on-disk storage mode, use the edge import mode. Switching from an in-memory to the on-disk storage mode is only allowed if the database is empty.
Power up your import
First, create indexes
In Memgraph, indexes are not created in advance and creating constraints does not imply index creation. By creating label or label-property indexes, relationships between nodes will be created much faster and consequently speed up data import. A good rule of thumb is to create indexes for the label or label and property being matched in Cypher queries. Here are the example Cypher queries:
CREATE (:Person {id: 1});
CREATE (:Person {id: 2});
MATCH (p1:Person {id: 1}), (p2:Person {id:2})
CREATE (p1)-[:IS_FRIENDS_WITH]->(p2);
Based on the above queries, it makes sense to create a label-property index
:Person(id)
for faster matching and relationship creation. The best practice
is to create and use an index on properties containing many distinct (ideally
unique) integer values, such as identification numbers. Choosing the right
data to create indexes on is important, as indexing all the content will not
improve the database speed. Indexing won't bring any improvements on a
low-cardinality property such as gender.
Be aware that creating a label-property index will not create a label index. If there is a need to optimize queries that fetch nodes by label, create a label index, too. A transaction which creates index or constraint will prevent other transactions from running, because such transactions take a unique lock for the whole duration of the transaction.
There are some downsides to indexing too. Each index requires extra storage (takes up RAM memory), which can greatly impact a large dataset. Besides that, indexes slow down write operations to the database because the structures in the index are dynamically updated on modifications or insertions of new nodes. That means you should avoid creating indexes on properties that are being updated frequently.
Use in-memory analytical storage mode
Memgraph supports three storage modes: IN_MEMORY_TRANSACTIONAL
, IN_MEMORY_ANALYTICAL
and ON_DISK_TRANSACTIONAL
To speed up import, use IN_MEMORY_ANALYTICAL
storage mode.
Data import is faster in IN_MEMORY_ANALYTICAL
storage mode because Delta
objects are disabled there, which ensures there is no memory overhead. The
Delta
objects track all changes, and that requires a lot of memory in case of
frequent updates. Not having Deltas
when importing data, speeds up the process
significantly.
The Delta
objects are crucial to ensure atomicity, consistency, isolation and
durability. If you are following instructions from LOAD
CSV and Cypher
queries best practices, your import process
does not need to depend on ACID properties, and it is safe to use
IN_MEMORY_ANALYTICAL
storage mode, even in the concurrent import scenarios.
Once you’re done with the data import process, if you have write-enabled
workloads you should consider switching back to the IN_MEMORY_TRANSACTIONAL
mode, which favors strongly-consistent ACID transactions. On a larger scale
import, if you stick with the default IN_MEMORY_TRANSACTIONAL
storage mode,
you’ll experience slower import time and spend more resources during data import
because of Delta
objects.
LOAD CSV best practices
The LOAD CSV
clause best performs when run in batches in parallel in
IN_MEMORY_ANALYTICAL
storage mode with properly set indexes. If you have data
up to a few millions of nodes and relationships, it is good enough to run a
simple LOAD CSV
query without the need to batch and parallelize it, resulting
in import via single thread/core on your machine. When migrating from a
relational database, export the tables as
CSV files, model the graph and create LOAD CSV
queries. When migrating from a graph database, you can also export the
graph into CSV
files and run
LOAD CSV
queries to import the data.
Batching the CSV file enables you to run multiple LOAD CSV
commands in
parallel, which utilizes hardware cores. You should consider batching the CSV
files if you have more than 10 million graph objects.
You need to store nodes and relationships in separate CSV files. The preparation work for batching includes splitting those files into smaller CSV files. The size of each CSV file (batch) depends on your hardware specifications and the total amount of data you are trying to import.
From the data perspective, if you have 100 milion nodes, it make sense to split the nodes into batches of 10 or 5 milion nodes per batch. You want to avoid having too large files, since loading a single file with 100 milion nodes can create significant memory overhead in multiple scenarios.
From the hardware perspective, if you have a 32-core CPU, it makes sense to split CSV file into 32 or more batches to fully utilize your machine. If you have fewer files, you won't put all threads to work, and consequently, you won't achieve the best import speed.
To use CSV batching, Memgraph needs to be in the IN_MEMORY_ANALYTICAL
storage
mode, since conflicting transactions
error will interrupt the process
otherwise. If you are running Memgraph in the IN_MEMORY_TRANSACTIONAL
mode,
you will be able to utilize just a single core, and the import process will be
much slower and it will use more memory because of Delta
object explanied in
section above.
Example of batched parallel import with LOAD CSV
in Python
Assuming that the files have been correctly split, and Memgraph is
running in IN_MEMORY_ANALYTICAL
mode, the import process
can start. If you're running Memgraph with Docker, you need to first copy the
CSV
files
into the container where Memgraph is runing. Based on the paths of the files,
you can generate the queries, each using a different file from the total pool of
files.
target_nodes_directory = Path(__file__).parents[3].joinpath(f"datasets/graph500/{size}/csv_node_chunks")
for file in target_nodes_directory.glob("*.csv"):
subprocess.run(["docker", "cp", str(file), f"memgraph:/usr/lib/memgraph/{file.name}"], check=True)
queries = []
for file in target_nodes_directory.glob("*.csv"):
queries.append(f"LOAD CSV FROM '/usr/lib/memgraph/{file.name}' WITH HEADER AS row CREATE (n:Node {{id: row.id}})")
Once the files and queries are in place, you can start with the concurrent
query execution using multiprocessing. By running the query in separate
processes and opening a new connection to the database in each process, you are
running LOAD CSV
in parallel. Keep in mind that the number of processes
running depends on your hardware availability. Generally, it should be close to
the number of threads on the machine. Going above that number will not bring any
perormance improvents, and it can even slow down the import process.
Here is the code that spans ten different processes, each running a separate CSV file via separate sessions and in parallel:
with multiprocessing.Pool(10) as pool:
pool.starmap(execute_csv_chunk, [(q, ) for q in queries])
#Rest of the code...
def execute_csv_chunk(query):
try:
driver = GraphDatabase.driver(HOST_PORT, auth=("", ""))
with driver.session() as session:
session.run(query)
except:
print("Failed to execute transaction")
raise e
Multiprocessing, rather than multithreading, was used in the example above because of the Global Interpreter Lock (GIL) in Python. The example above can import one million nodes or relationships per second, depending on the hardware, graph structure and the number of node or relationship properties.
The important factor is that all nodes need to be imported first. After the last file with nodes is imported, the relationships can be imported in the same way.
When to use CSV file import tool?
In Memgraph Lab, there is an option of effortlessly importing your data using the CSV file import tool. This tool allows you to import your data by configuring schema, nodes and relationships with only few clicks.
Users with a smaller dataset (typically files up to 10MB in size) or those less
familiar with Cypher queries, will benefit the most from leveraging the CSV file
import tool. However, for optimal speed and performance, it's still recommended
to use the LOAD CSV
Cypher clause. Using the LOAD CSV
Cypher clause
eliminates the additional steps in uploading files to Memgraph and communicating
with the database.
Users with a larger dataset can still make use of the CSV import tool by configuring their dataset and utilizing the code generated by the tool. Simply copy and run the generated code directly in the Query execution window, enhancing your overall database performance.
Cypher queries best practices
If large dataset files are not in the CSV format, any type of dataset file can be easily transformed into Cypher queries that can create the graph based on the dataset files. It is the most flexible way to import any type of data into Memgraph, but it requires a bit of work.
To start, the dataset file needs to be read in the programming language of choice, and they should be split into the nodes and edges files.
Let’s assume the dataset file describing relationships looks like the following:
// Relationships.txt
2 1
3 1
4 1
6 1
29 1
43 1
44 1
51 1
53 1
87 1
Each line describes the two node ids that represent an adjacent relationship type between them.
The key is to transform the chunk of relationships into a list that will be passed to a single query as an argument. Here is an example:
#Code for reading the lines omitted
create_relationships.append({"a": int(node_source), "b": int(node_sink)})
if len(create_relationships) == CHUNK_SIZE:
print("Chunk size reached - adding to chunks ...", len(create_relationships))
chunks.append(create_relationships)
create_relationships = []
The big file of relationships is being chunked into the list between 10K and 100K elements. The exact number of elements should depend on available hardware.
After that, each relationship pair is in the list and can be expanded into the
query with UNWIND
command:
query = """
WITH $batch AS nodes
UNWIND nodes AS node
MATCH (a:Node {id: node.a}), (b:Node {id: node.b}) CREATE (a)-[:RELATIONSHIP]->(b)
"""
This part is important since you run a single query and transaction to create
10K or 100K relationships. Running a batch of CREATE
statements in a single
transaction will be much slower than using UNWIND
, and it is even worse if you
run a single transaction for each CREATE
statement.
Each chunk of relationships, in this case, can then be run in parallel. To do that, you should run each chunk as a separate process. Here is an example:
with multiprocessing.Pool(10) as pool:
pool.starmap(process_chunk, [(query, chunk) for chunk in chunks])
Multiprocessing, rather than multithreading, was used in the example above because of the Global Interpreter Lock (GIL) in Python.
Depending on the storage mode in which Memgraph is running, the following steps will differ a bit. The fastest and least resource-intensive way is by using the in-memory analytical storage mode. It is also recommended for running an import via Cypher.
If you are running Cypher import in IN_MEMORY_ANALYTICAL
mode, your function for
executing chunk of nodes or relationships looks like following:
def process_chunk_managed_API(query, create_list):
driver = GraphDatabase.driver(HOST_PORT, auth=("", ""))
with driver.session(max_transaction_retry_time=180.0, initial_retry_delay=0.2, retry_delay_multiplier=1.1, retry_delay_jitter_factor=0.1) as session:
session.execute_write(lambda tx: tx.run(query, {"batch": create_list}))
driver.close()
Notice the individual batch being passed to the query.
If you are running Cypher import in the IN_MEMORY_ANALYTICAL
mode and using
Memgraph Python driver (pymgclient or GQLAlchemy), the execution of the node
queries won’t differ compared to the process_chunk
described above. On the
other side, if Memgraph is in IN_MEMORY_TRANSACTIONAL
mode, the execution of
relationship creation queries can run into
conflicting transactions error.
If you have a file with Cypher queries from another graph database, you can tweak it to fit Memgraph’s Cypher syntax (opens in a new tab) and import it into Memgraph.
How to handle conflicting transactions error
If you're importing data concurrently in the IN_MEMORY_TRANSACTIONAL
storage
mode, you can expect Cannot resolve conflicting transactions
error to occur.
This happens because your code tries to concurrently update the same graph
object which is locked while being created or updated. This error is expected
and it happens because of ACID guarantees in the IN_MEMORY_TRANSACTIONAL
storage.
The dataset structure will highly influence the probability of having conflicting transactions during relationships import. Since nodes are separate entities, this error won’t appear during node import. On the other hand, each relationship connects two nodes, so if you have a supernode (a node connected to many other nodes in the dataset), parallel relationship writing on the supernode will cause conflicts. The more connected the node is, the more serial the performance will be with more conflicts.
There are two most common approaches to handle conflicting transactions error:
1. Switch to in-memory analytical storage mode
You won't experience conflicting transactions error in the
IN_MEMORY_ANALYTICAL
storage mode. Still, it is important you ensure that
there are no concurrent transactions updating the same graph object property.
During import, you can ensure this by first importing nodes and then
relationships. On node import it is ideal to have CREATE
queries so if you run
such queries concurrently, none of them updates the same graph object property.
Adding relationships to a relationships vector inside the node object is a
thread safe operation, so there are no concerns as well, but just make sure you
don't update the property of the same relationship concurrently. It is
important not to mix concurrent import of nodes and relationships in this
storage mode. If you do that, a relationship can end up not being created,
because its start and end nodes which you’re trying to match might not yet be
created.
2. Catch the error and retry the transaction with backoff
If you can't switch to IN_MEMORY_ANALYTICAL
storage mode and can't avoid
concurrent updates to the same graph objects, then you'll have to retry the
transactions which throw the conflicting transactions error.
def process_chunk(query, create_list, max_retries=100, initial_wait_time=0.200, backoff_factor=1.1, jitter=0.1):
session = GraphDatabase.driver(HOST_PORT, auth=("", "")).session()
for attempt in range(max_retries):
try:
with session.begin_transaction() as tx:
tx.run(query, {"batch": create_list})
tx.commit()
break
except TransientError as te:
jitter = random.uniform(0, jitter) * initial_wait_time
wait_time = initial_wait_time * (backoff_factor ** attempt) + jitter
print(f"Commit failed on attempt {attempt+1}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
except Exception as e:
print(f"Failed to execute transaction: {e}")
session.close()
raise e
The execute needs to be in the try-except
block due to possible exceptions
from conflicting transactions error. If that error happens, the transaction run
should be retried, but with the backoff_factor
starting based on the
initial_wait_time
in seconds.
The important arguments to consider here are the inital_wait_time
and
backoff_factor.
The initial time should not be too long, approximately the
duration of the batch commit. The backoff_factor
should be small and increase
slowly since conflicts can occur frequently. It should never be logarithmic
since it will fire up to minutes quickly. The jitter
is added to avoid the
retries at a similar time.
The code snippet above is using an explicit transaction, which is necessary for handling the conflicts. They can also be handled with the managed transactions. Implicit (autocommit) transactions are not recommended for handing the conflicts.
I have both CSV and Cypher queries. Which should I use?
The LOAD CSV with batches and running on multi-core machine will yield a much better performance and will use less resources then the process of loading via the Cypher queries. The process of loading data should require less effort via LOAD CSV.
On the other hand, if you want to parse and change your data in any way, before it reaches Memgraph, it is much easier to do it in the programming language then to process all CSV files based on some rules. Cypher provides a bit more flexibility in expressing what you want, but it comes at the cost of slower performance and more effort.
What to do with other data types?
Other common data types are also used to migrate data from one system to another, such as JSON, Parquet, ORC or IPC/Feather/Arrow files. Memgraph offers out-of-the-box solutions for those types of data, but for the fastest import, using the LOAD CSV clause or Cypher queries is still recommended. Besides storing data in files, you might also have data coming in from a streaming data platform, such as Kafka, RedPanda or Pulsar. In that case, Memgraph has built-in consumers to ingest the data properly and uses procedures called transformations to convert incoming data into a Cypher query that will create data in Memgraph.
Do I lose my data if I restart the database?
Although Memgraph is an in-memory database, the data is persistent and durable. This means data will not be lost upon reboot. Memgraph uses two mechanisms to ensure the durability of stored data and make disaster recovery possible:
- write-ahead logging (WAL)
- periodic snapshot creation
Each database modification is recorded in a log file before being written to the database. Therefore, the log file contains all steps needed to reconstruct the DB’s most recent state. Memgraph also periodically takes snapshots during runtime to write the entire data storage to the drive. On startup, the database state is recovered from the most recent snapshot file. The timestamp of the snapshot is compared with the latest update recorded in the WAL file and, if the snapshot is less recent, the state of the DB will be completely recovered using the WAL file. If you are using Memgraph with Docker, be sure to specify a volume for data persistence.
These files are stored on disk, which is why you’ll notice memory used on your
disk as well. If the disk storage seems too high, that might be because, by
default, Memgraph stores the three most recent snapshots, defined with
--storage-snapshot-retention-count
flag. The snapshots are, by default,
created every 300 seconds, meaning that with an extra large dataset (in
terabytes of RAM), it is possible that one snapshot is still being created when
another one had just started creating. To avoid that,
configure
--storage-snapshot-interval-sec
setting to a large enough number.
How to efficiently delete everything?
Matching nodes and then deleting relationships attached to them can consume a lot of memory in larger datasets (>1M). This is due to the accumulation of Deltas, which store changes to the graph objects.
Periodic execution
To efficiently keep the memory low during query execution for a single query, refer to periodic execution.
Efficiently deleting everything manually
To efficiently drop the database, first delete all relationships and then all nodes. To delete the relationships, execute the query below repeatedly until the number of deleted relationships is 100,000.
MATCH ()-[r]->()
WITH r
LIMIT 100000
DELETE r
RETURN count(r) AS num_deleted;
After deleting all relationships, run the following query repeatedly until the number of deleted nodes is 100,000 to delete all nodes:
MATCH (n)
WITH n
LIMIT 100000
DELETE n
RETURN count(n) AS num_deleted;
If the deletion still consumes too much memory, consider lowering the batch size
limit. If you notice memory still being to high, it can be due to the high value
of the --storage-gc-cycle-sec
flag. That means that Memgraph's garbage
collector potentially still didn't deallocate unused objects and free the
memory. You can free up memory by running the FREE MEMORY
query.
Monitoring memory during import
Often times, the user needs to batch transactions in the IN_MEMORY_TRANSACTIONAL
storage mode.
This is due to large-update transactions that are creating a lot of Delta
objects during the
transaction lifetime. Delta
objects are necessary for reverting the query if an error during
query execution happens. They are approximately of 56 bytes in size, but large-update transactions
can make the number of delta objects be huge - therefore resulting in Memgraph getting out of memory.
We have exposed the peak_memory_res
(peak resident memory) variable in the SHOW STORAGE INFO;
command. with which you
can monitor when the peak resident memory rises in the system.
It will help you diagnose bottlenecks and high-memory queries which can be optimized.
That is most common in import queries because users would want to import the whole dataset with one command only.
For more information, check our storage memory usage.
For more information about Delta
objects, check the
information on the IN_MEMORY_TRANSACTIONAL storage mode.
Do you have more questions?
Schedule a 30 min session with one of our engineers to discuss how Memgraph fits with your architecture. Our engineers are highly experienced in helping companies of all sizes to integrate and get the most out of Memgraph in their projects. Talk to us about data modeling, optimizing queries, defining infrastructure requirements or migrating from your existing graph database. No nonsense or sales pitch, just tech.