View Javadoc
1   package org.neo4j.talend;
2   
3   import org.apache.commons.lang3.StringUtils;
4   import org.apache.log4j.Logger;
5   
6   import org.neo4j.graphdb.DynamicRelationshipType;
7   
8   import java.io.IOException;
9   import java.util.List;
10  import java.util.Map;
11  
12  public class Neo4jBatchInserterRelationship extends Neo4jBatchInserterAbstract {
13  
14      private static Logger log = Logger.getLogger(Neo4jBatchInserterRelationship.class);
15  
16      private String relationshipTypeField;
17      private String direction;
18      private String startIndexName;
19      private String startIndexField;
20      private String endIndexName;
21      private String endIndexField;
22  
23      private Boolean skipOnError;
24  
25      /**
26       * Constructor.
27       */
28      public Neo4jBatchInserterRelationship(Neo4jBatchDatabase graphDb, String relationshipTypeField, String direction,
29                                            String startIndexName, String startIndexField, String endIndexName, String endIndexField,
30                                            Boolean skipOnError) throws IOException {
31          super(graphDb);
32  
33          if (StringUtils.isEmpty(relationshipTypeField)) {
34              throw new RuntimeException("relationshipTypeField must be defined !!!");
35          }
36          this.relationshipTypeField = relationshipTypeField;
37  
38          if (StringUtils.isEmpty(direction)) {
39              throw new RuntimeException("direction field must be defined !!!");
40          }
41          this.direction = direction;
42  
43          if (StringUtils.isEmpty(startIndexName)) {
44              throw new RuntimeException("startIndexName field must be defined !!!");
45          }
46          this.startIndexName = startIndexName;
47  
48          if (StringUtils.isEmpty(startIndexField)) {
49              throw new RuntimeException("startIndexField field must be defined !!!");
50          }
51          this.startIndexField = startIndexField;
52  
53          if (StringUtils.isEmpty(endIndexName)) {
54              throw new RuntimeException("endIndexKey field must be defined !!!");
55          }
56          this.endIndexName = endIndexName;
57  
58          if (StringUtils.isEmpty(endIndexField)) {
59              throw new RuntimeException("endIndexField field must be defined !!!");
60          }
61          this.endIndexField = endIndexField;
62  
63          this.skipOnError = skipOnError;
64  
65          this.batchDb = graphDb;
66      }
67  
68      /**
69       * Create the relationship.
70       *
71       * @param incoming   Talend incoming object
72       * @param columnList Attribute list of Talend object
73       */
74      public void create(Object incoming, List<String> columnList) {
75          try {
76              Long startNode = this.batchDb.findNodeInBatchIndex(startIndexName, this.getObjectProperty(incoming, startIndexField));
77              Long endNode = this.batchDb.findNodeInBatchIndex(endIndexName, this.getObjectProperty(incoming, endIndexField));
78              Map<String, Object> properties = constructMapFromObject(incoming, columnList);
79  
80              if (startNode != null && endNode != null) {
81  
82                  String type = (String) this.getObjectProperty(incoming, relationshipTypeField);
83                  if (StringUtils.isNotEmpty(type)) {
84  
85                      properties.remove(relationshipTypeField);
86                      properties.remove(startIndexField);
87                      properties.remove(endIndexField);
88  
89                      if (direction.endsWith("OUTGOING")) {
90                          this.batchDb.getInserter()
91                                  .createRelationship(startNode, endNode, DynamicRelationshipType.withName(type),
92                                          properties);
93                      } else {
94                          this.batchDb.getInserter()
95                                  .createRelationship(endNode, startNode, DynamicRelationshipType.withName(type),
96                                          properties);
97                      }
98                  } else {
99                      log.trace("Ignoring incoming object {" + properties.toString() + "} due to none relation type");
100                 }
101             } else {
102                 String msg = "Can't find start node " + startNode + " or end node " + endNode;
103                 if (skipOnError) {
104                     log.trace(msg);
105                 } else {
106                     throw new RuntimeException(msg);
107                 }
108             }
109 
110         } catch (IllegalAccessException e) {
111             throw new RuntimeException(e);
112         }
113     }
114 
115     /**
116      * Finish the process.
117      */
118     public void finish() {
119     }
120 
121 }