1 package org.neo4j.talend;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.neo4j.graphdb.DynamicLabel;
5 import org.neo4j.graphdb.Label;
6
7 import java.io.IOException;
8 import java.util.ArrayList;
9 import java.util.Arrays;
10 import java.util.List;
11 import java.util.Map;
12
13 public class Neo4jBatchInserterNode extends Neo4jBatchInserterAbstract {
14
15
16
17
18 private String labelsField;
19
20
21
22
23 private String batchIndexName;
24
25
26
27
28 private String batchIndexFieldName;
29
30
31
32
33
34 private Boolean insertIndexFieldName;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public Neo4jBatchInserterNode(Neo4jBatchDatabase batchDb, String labelsField, String batchIndexName, String batchIndexFieldName, Boolean insertIndexFieldName, Integer indexCacheSize) throws IOException {
49 super(batchDb);
50
51
52 if (StringUtils.isEmpty(batchIndexName)) {
53 throw new RuntimeException("batchIndexName must be defined !!!");
54 }
55 if (StringUtils.isEmpty(batchIndexFieldName)) {
56 throw new RuntimeException("batchIndexFieldName field must be defined !!!");
57 }
58 if (indexCacheSize == null) {
59 throw new RuntimeException("Index cache size must be defined !!!");
60 }
61
62
63 this.batchIndexName = batchIndexName;
64 this.batchIndexFieldName = batchIndexFieldName;
65 this.labelsField = labelsField;
66 this.insertIndexFieldName = insertIndexFieldName;
67
68
69 this.batchDb.createBatchIndex(batchIndexName, indexCacheSize);
70 }
71
72
73
74
75
76
77
78 public void create(Object incoming, List<String> columnList) {
79 try {
80
81 Map<String, Object> properties = constructMapFromObject(incoming, columnList);
82
83
84 Object importId = this.getObjectProperty(incoming, this.batchIndexFieldName);
85
86
87 Label[] labels = this.computeLabels(incoming);
88
89
90 if (StringUtils.isNotEmpty(this.labelsField)) {
91 properties.remove(this.labelsField);
92 }
93 if (!this.insertIndexFieldName) {
94 properties.remove(this.batchIndexFieldName);
95 }
96
97
98 Long id = this.batchDb.findNodeInBatchIndex(batchIndexName, importId);
99 if (id != null) {
100
101
102 Map<String, Object> props = this.batchDb.getInserter().getNodeProperties(id);
103 props.putAll(properties);
104 this.batchDb.getInserter().setNodeProperties(id, props);
105
106
107 List<Label> labelList = new ArrayList<>(Arrays.asList(labels));
108 for (Label lab : this.batchDb.getInserter().getNodeLabels(id)) {
109 labelList.add(lab);
110 }
111 Label[] mergeLabels = labelList.toArray(new Label[labelList.size()]);
112 this.batchDb.getInserter().setNodeLabels(id, mergeLabels);
113
114 } else {
115
116 long nodeId = this.batchDb.getInserter().createNode(properties, labels);
117
118
119 this.batchDb.indexNodeInBatchIndex(batchIndexName, nodeId, importId);
120 }
121
122 } catch (IllegalAccessException e) {
123 throw new RuntimeException(e);
124 }
125 }
126
127
128
129
130 public void finish() {
131 this.batchDb.flushBatchIndex(batchIndexName);
132 }
133
134 protected Label[] computeLabels(Object incoming) throws IllegalAccessException {
135 Label[] labels = new Label[0];
136
137 if (StringUtils.isNotEmpty(this.labelsField)) {
138
139 String labelList = (String) this.getObjectProperty(incoming, this.labelsField);
140
141 if (StringUtils.isNotEmpty(labelList)) {
142 String[] labelTab = labelList.split(";");
143 labels = new Label[labelTab.length];
144 for (int i = 0; i < labelTab.length; i++) {
145 labels[i] = DynamicLabel.label(labelTab[i]);
146 }
147 }
148 }
149
150 return labels;
151 }
152
153 }