View Javadoc
1   package org.neo4j.talend;
2   
3   import org.apache.commons.lang3.StringUtils;
4   import org.neo4j.graphdb.DynamicLabel;
5   import org.neo4j.graphdb.Label;
6   
7   import java.io.IOException;
8   import java.util.ArrayList;
9   import java.util.Arrays;
10  import java.util.List;
11  import java.util.Map;
12  
13  public class Neo4jBatchInserterNode extends Neo4jBatchInserterAbstract {
14  
15      /**
16       * Field name where to find the list of label
17       */
18      private String labelsField;
19  
20      /**
21       * Batch index name
22       */
23      private String batchIndexName;
24  
25      /**
26       * Name of the field on wich we create the batch index
27       */
28      private String batchIndexFieldName;
29  
30      /**
31       * If true, when we create a node we also save the field with its value.
32       * It is usefull to put it at false if this field is just a technical one for the import.
33       */
34      private Boolean insertIndexFieldName;
35  
36  
37      /**
38       * Constructor
39       *
40       * @param batchDb              Batch inserter database
41       * @param labelsField          Field name where to find labels
42       * @param batchIndexName       Batch index name
43       * @param batchIndexFieldName  Batch index field name
44       * @param insertIndexFieldName If we have to save the index field
45       * @param indexCacheSize       Size of the cache index
46       * @throws IOException
47       */
48      public Neo4jBatchInserterNode(Neo4jBatchDatabase batchDb, String labelsField, String batchIndexName, String batchIndexFieldName, Boolean insertIndexFieldName, Integer indexCacheSize) throws IOException {
49          super(batchDb);
50  
51          // Check required component properties
52          if (StringUtils.isEmpty(batchIndexName)) {
53              throw new RuntimeException("batchIndexName must be defined !!!");
54          }
55          if (StringUtils.isEmpty(batchIndexFieldName)) {
56              throw new RuntimeException("batchIndexFieldName field must be defined !!!");
57          }
58          if (indexCacheSize == null) {
59              throw new RuntimeException("Index cache size must be defined !!!");
60          }
61  
62          // Setting class value
63          this.batchIndexName = batchIndexName;
64          this.batchIndexFieldName = batchIndexFieldName;
65          this.labelsField = labelsField;
66          this.insertIndexFieldName = insertIndexFieldName;
67  
68          // Create the batch index
69          this.batchDb.createBatchIndex(batchIndexName, indexCacheSize);
70      }
71  
72      /**
73       * Insert a node.
74       *
75       * @param incoming   Talend object
76       * @param columnList Attributes list of the Talend object
77       */
78      public void create(Object incoming, List<String> columnList) {
79          try {
80              // Construct the property map
81              Map<String, Object> properties = constructMapFromObject(incoming, columnList);
82  
83              // Import ID
84              Object importId = this.getObjectProperty(incoming, this.batchIndexFieldName);
85  
86              // Label list
87              Label[] labels = this.computeLabels(incoming);
88  
89              // Remove none necessary fields (importId, & label)
90              if (StringUtils.isNotEmpty(this.labelsField)) {
91                  properties.remove(this.labelsField);
92              }
93              if (!this.insertIndexFieldName) {
94                  properties.remove(this.batchIndexFieldName);
95              }
96  
97              // If the node exist, we update it, otherwise we create it
98              Long id = this.batchDb.findNodeInBatchIndex(batchIndexName, importId);
99              if (id != null) {
100 
101                 // update properties
102                 Map<String, Object> props = this.batchDb.getInserter().getNodeProperties(id);
103                 props.putAll(properties);
104                 this.batchDb.getInserter().setNodeProperties(id, props);
105 
106                 // update labels
107                 List<Label> labelList = new ArrayList<>(Arrays.asList(labels));
108                 for (Label lab : this.batchDb.getInserter().getNodeLabels(id)) {
109                     labelList.add(lab);
110                 }
111                 Label[] mergeLabels = labelList.toArray(new Label[labelList.size()]);
112                 this.batchDb.getInserter().setNodeLabels(id, mergeLabels);
113 
114             } else {
115                 // Create the node
116                 long nodeId = this.batchDb.getInserter().createNode(properties, labels);
117 
118                 // Save the node into the index
119                 this.batchDb.indexNodeInBatchIndex(batchIndexName, nodeId, importId);
120             }
121 
122         } catch (IllegalAccessException e) {
123             throw new RuntimeException(e);
124         }
125     }
126 
127     /**
128      * Finish the process.
129      */
130     public void finish() {
131         this.batchDb.flushBatchIndex(batchIndexName);
132     }
133 
134     protected Label[] computeLabels(Object incoming) throws IllegalAccessException {
135         Label[] labels = new Label[0];
136 
137         if (StringUtils.isNotEmpty(this.labelsField)) {
138 
139             String labelList = (String) this.getObjectProperty(incoming, this.labelsField);
140 
141             if (StringUtils.isNotEmpty(labelList)) {
142                 String[] labelTab = labelList.split(";");
143                 labels = new Label[labelTab.length];
144                 for (int i = 0; i < labelTab.length; i++) {
145                     labels[i] = DynamicLabel.label(labelTab[i]);
146                 }
147             }
148         }
149 
150         return labels;
151     }
152 
153 }