1 package org.neo4j.talend;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.apache.log4j.Logger;
5
6 import org.neo4j.graphdb.DynamicRelationshipType;
7
8 import java.io.IOException;
9 import java.util.List;
10 import java.util.Map;
11
12 public class Neo4jBatchInserterRelationship extends Neo4jBatchInserterAbstract {
13
14 private static Logger log = Logger.getLogger(Neo4jBatchInserterRelationship.class);
15
16 private String relationshipTypeField;
17 private String direction;
18 private String startIndexName;
19 private String startIndexField;
20 private String endIndexName;
21 private String endIndexField;
22
23 private Boolean skipOnError;
24
25
26
27
28 public Neo4jBatchInserterRelationship(Neo4jBatchDatabase graphDb, String relationshipTypeField, String direction,
29 String startIndexName, String startIndexField, String endIndexName, String endIndexField,
30 Boolean skipOnError) throws IOException {
31 super(graphDb);
32
33 if (StringUtils.isEmpty(relationshipTypeField)) {
34 throw new RuntimeException("relationshipTypeField must be defined !!!");
35 }
36 this.relationshipTypeField = relationshipTypeField;
37
38 if (StringUtils.isEmpty(direction)) {
39 throw new RuntimeException("direction field must be defined !!!");
40 }
41 this.direction = direction;
42
43 if (StringUtils.isEmpty(startIndexName)) {
44 throw new RuntimeException("startIndexName field must be defined !!!");
45 }
46 this.startIndexName = startIndexName;
47
48 if (StringUtils.isEmpty(startIndexField)) {
49 throw new RuntimeException("startIndexField field must be defined !!!");
50 }
51 this.startIndexField = startIndexField;
52
53 if (StringUtils.isEmpty(endIndexName)) {
54 throw new RuntimeException("endIndexKey field must be defined !!!");
55 }
56 this.endIndexName = endIndexName;
57
58 if (StringUtils.isEmpty(endIndexField)) {
59 throw new RuntimeException("endIndexField field must be defined !!!");
60 }
61 this.endIndexField = endIndexField;
62
63 this.skipOnError = skipOnError;
64
65 this.batchDb = graphDb;
66 }
67
68
69
70
71
72
73
74 public void create(Object incoming, List<String> columnList) {
75 try {
76 Long startNode = this.batchDb.findNodeInBatchIndex(startIndexName, this.getObjectProperty(incoming, startIndexField));
77 Long endNode = this.batchDb.findNodeInBatchIndex(endIndexName, this.getObjectProperty(incoming, endIndexField));
78 Map<String, Object> properties = constructMapFromObject(incoming, columnList);
79
80 if (startNode != null && endNode != null) {
81
82 String type = (String) this.getObjectProperty(incoming, relationshipTypeField);
83 if (StringUtils.isNotEmpty(type)) {
84
85 properties.remove(relationshipTypeField);
86 properties.remove(startIndexField);
87 properties.remove(endIndexField);
88
89 if (direction.endsWith("OUTGOING")) {
90 this.batchDb.getInserter()
91 .createRelationship(startNode, endNode, DynamicRelationshipType.withName(type),
92 properties);
93 } else {
94 this.batchDb.getInserter()
95 .createRelationship(endNode, startNode, DynamicRelationshipType.withName(type),
96 properties);
97 }
98 } else {
99 log.trace("Ignoring incoming object {" + properties.toString() + "} due to none relation type");
100 }
101 } else {
102 String msg = "Can't find start node " + startNode + " or end node " + endNode;
103 if (skipOnError) {
104 log.trace(msg);
105 } else {
106 throw new RuntimeException(msg);
107 }
108 }
109
110 } catch (IllegalAccessException e) {
111 throw new RuntimeException(e);
112 }
113 }
114
115
116
117
118 public void finish() {
119 }
120
121 }