-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathlabel.py
117 lines (102 loc) · 4.81 KB
/
label.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import re
import sys
import click
try:
from .entity_file import EntityFile, Type
from .exceptions import SchemaError
except:
from entity_file import EntityFile, Type
from exceptions import SchemaError
class Label(EntityFile):
"""Handler class for processing Label CSV files."""
def __init__(self, query_buffer, infile, label_str, config, filter_column=None):
self.id_namespace = None
self.query_buffer = query_buffer
super(Label, self).__init__(infile, label_str, config, filter_column)
def process_schemaless_header(self, header):
# The first column is the ID.
# If this starts with an underscore, it is not a property and should not be introduced to the graph.
self.id = 0
for idx, field in enumerate(header):
field = field.strip()
self.column_names[idx] = field
if header[0][0] == "_":
self.column_names[0] = None
def post_process_header_with_schema(self, header):
# No ID field is required if we're only inserting nodes.
if self.config.store_node_identifiers is False:
return
# Verify that exactly one field is labeled ID.
if (self.types.count(Type.ID_STRING) + self.types.count(Type.ID_INTEGER)) != 1:
raise SchemaError(
f"Node file '{self.infile.name}' should have exactly one ID column."
)
# Track the offset containing the node ID.
try:
self.id = self.types.index(Type.ID_STRING)
except ValueError:
self.id = self.types.index(Type.ID_INTEGER)
id_field = header[self.id]
# If the ID field specifies an ID namespace in parentheses like "val:ID(NAMESPACE)", capture the namespace.
match = re.search(r"\((\w+)\)", id_field)
if match:
self.id_namespace = match.group(1)
def update_node_dictionary(self, identifier):
"""Add identifier->ID pair to dictionary if we are building relations"""
if identifier in self.query_buffer.nodes:
sys.stderr.write(
"Node identifier '%s' was used multiple times - second occurrence at %s:%d\n"
% (identifier, self.infile.name, self.reader.line_num)
)
if self.config.skip_invalid_nodes is False:
sys.exit(1)
self.query_buffer.nodes[identifier] = self.query_buffer.top_node_id
self.query_buffer.top_node_id += 1
def process_entities(self):
entities_created = 0
with click.progressbar(
self.reader,
length=self.entities_count,
label=self.entity_str,
update_min_steps=100,
) as reader:
for row in reader:
self.validate_row(row)
if self.filter_value is not None and row[self.filter_column_id] != self.filter_value:
continue
# Update the node identifier dictionary if necessary
if self.config.store_node_identifiers:
id_field = row[self.id]
if self.id_namespace is not None:
id_field = self.id_namespace + "." + str(id_field)
self.update_node_dictionary(id_field)
try:
row_binary = self.pack_props(row)
except SchemaError as e:
# TODO why is line_num off by one?
raise SchemaError(
"%s:%d %s"
% (self.infile.name, self.reader.line_num - 1, str(e))
)
row_binary_len = len(row_binary)
# If the addition of this entity will make the binary token grow too large,
# send the buffer now.
# TODO how much of this can be made uniform w/ relations and moved to Querybuffer?
added_size = self.binary_size + row_binary_len
if (
added_size >= self.config.max_token_size
or self.query_buffer.buffer_size + added_size
>= self.config.max_buffer_size
):
self.query_buffer.labels.append(self.to_binary())
self.query_buffer.send_buffer()
self.reset_partial_binary()
# Push the label onto the query buffer again, as there are more entities to process.
self.query_buffer.labels.append(self.to_binary())
self.query_buffer.node_count += 1
entities_created += 1
self.binary_size += row_binary_len
self.binary_entities.append(row_binary)
self.query_buffer.labels.append(self.to_binary())
self.infile.close()
print("%d nodes created with label '%s'" % (entities_created, self.entity_str))