View Javadoc
1   package org.neo4j.talend;
2   
3   import org.apache.log4j.Logger;
4   import org.neo4j.graphdb.DynamicLabel;
5   import org.neo4j.graphdb.index.IndexHits;
6   import org.neo4j.helpers.collection.MapUtil;
7   import org.neo4j.index.lucene.unsafe.batchinsert.LuceneBatchInserterIndexProvider;
8   import org.neo4j.unsafe.batchinsert.BatchInserter;
9   import org.neo4j.unsafe.batchinsert.BatchInserterIndex;
10  import org.neo4j.unsafe.batchinsert.BatchInserterIndexProvider;
11  import org.neo4j.unsafe.batchinsert.BatchInserters;
12  
13  import java.io.File;
14  import java.io.IOException;
15  import java.util.HashMap;
16  import java.util.Map;
17  
18  /**
19   * Wrapper on a Neo4j batch inserter 'database'.
20   */
21  public class Neo4jBatchDatabase {
22  
23      /**
24       * The logger
25       */
26      private static Logger log = Logger.getLogger(Neo4jBatchDatabase.class);
27  
28      /**
29       * Name of the field index for the importId
30       */
31      private final static String BACTH_INDEX_ID_NAME = "importId";
32  
33      /**
34       * Batch inserter
35       */
36      private BatchInserter inserter;
37  
38      /**
39       * Batch inserter index provider
40       */
41      private BatchInserterIndexProvider indexProvider;
42  
43      /**
44       * Store the list of index
45       */
46      protected Map<String, BatchInserterIndex> batchInserterIndexes;
47  
48  
49      /**
50       * Constructor that create an embedded database.
51       *
52       * @param path          Path of the graph database folder
53       * @param configuration Configuration map of database
54       */
55      public Neo4jBatchDatabase(String path, Map<String, String> configuration) {
56          try {
57              // Initialize batch inserter
58              this.inserter = BatchInserters.inserter(new File(path), configuration);
59  
60              // Init
61              this.indexProvider = new LuceneBatchInserterIndexProvider(inserter);
62              this.batchInserterIndexes = new HashMap<>();
63  
64          } catch (IOException e) {
65              throw new RuntimeException(e);
66          }
67      }
68  
69  
70      /**
71       * Getter for inserter
72       *
73       * @return The batch inserter
74       */
75      public BatchInserter getInserter() {
76          return this.inserter;
77      }
78  
79      /**
80       * Create and store the index, if it doesn't exist.
81       *
82       * @param indexName      Name of the index
83       * @param indexCacheSize Number of element into the cache size for the index
84       */
85      public void createBatchIndex(String indexName, Integer indexCacheSize) {
86          if (!batchInserterIndexes.containsKey(indexName)) {
87              BatchInserterIndex index = indexProvider.nodeIndex(indexName, MapUtil.stringMap("type", "exact"));
88              if (indexCacheSize > 0) {
89                  index.setCacheCapacity(BACTH_INDEX_ID_NAME, indexCacheSize);
90              }
91              batchInserterIndexes.put(indexName, index);
92          } else {
93              log.trace("Index [" + indexName + "] already exist");
94          }
95      }
96  
97      /**
98       * Flush the specified index on the disk, so it will be available after for search.
99       *
100      * @param indexName Name of the index to flush
101      */
102     public void flushBatchIndex(String indexName) {
103         if (batchInserterIndexes.containsKey(indexName)) {
104             BatchInserterIndex index = batchInserterIndexes.get(indexName);
105             index.flush();
106         } else {
107             log.trace("Flushing a non exist index ... [" + indexName + "]");
108         }
109     }
110 
111     /**
112      * Push a node importId into the batch index.
113      *
114      * @param indexName     The batch index name
115      * @param node          Neo4j identifier
116      * @param importIdValue Import identifier
117      */
118     public void indexNodeInBatchIndex(String indexName, long node, Object importIdValue) {
119         if (batchInserterIndexes.containsKey(indexName)) {
120             BatchInserterIndex index = batchInserterIndexes.get(indexName);
121 
122             Map<String, Object> props = new HashMap<>();
123             props.put(BACTH_INDEX_ID_NAME, importIdValue);
124             index.add(node, props);
125         } else {
126             log.trace("Can't index node " + node + "/" + importIdValue + " into a none existant index [" + indexName + "]");
127         }
128     }
129 
130     /**
131      * Find a node in the index.
132      *
133      * @param indexName Name of the batch index
134      * @param value     Value to search in index
135      * @return Neo4j node identifier
136      */
137     public Long findNodeInBatchIndex(String indexName, Object value) {
138         Long nodeId = null;
139         if (this.batchInserterIndexes.containsKey(indexName)) {
140             IndexHits<Long> result = this.batchInserterIndexes.get(indexName).get(BACTH_INDEX_ID_NAME, value);
141             if (result.size() > 0) {
142                 nodeId = result.getSingle();
143             }
144         } else {
145             log.trace("Can't find object [" + value + "] into index " + indexName);
146         }
147         return nodeId;
148     }
149 
150     /**
151      * Create schema.
152      *
153      * @param type     Type of schema to create (ie. constraint or index on node property)
154      * @param label    Node label
155      * @param property Node property
156      */
157     public void createSchema(String type, String label, String property) {
158         switch (type) {
159             case "NODE_PROPERTY_UNIQUE":
160                 this.inserter.createDeferredConstraint(DynamicLabel.label(label)).assertPropertyIsUnique(property).create();
161                 break;
162             case "NODE_PROPERTY_INDEX":
163                 this.inserter.createDeferredSchemaIndex(DynamicLabel.label(label)).on(property).create();
164                 break;
165             default:
166                 // default is unique constraint
167                 this.inserter.createDeferredConstraint(DynamicLabel.label(label)).assertPropertyIsUnique(property).create();
168                 break;
169         }
170     }
171 
172     /**
173      * Shutdown the graph database
174      */
175     public void shutdown() {
176         if (this.inserter != null) {
177             this.inserter.shutdown();
178         }
179         this.inserter = null;
180 
181         if (this.indexProvider != null) {
182             this.indexProvider.shutdown();
183         }
184         this.indexProvider = null;
185         this.batchInserterIndexes = null;
186     }
187 
188 }