-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathutil_proc_runner.py
45 lines (32 loc) · 1.34 KB
/
util_proc_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from typing import Any
from ..error.illegal_attr_checker import IllegalAttrChecker
from ..error.uncallable_namespace import UncallableNamespace
from ..utils.util_node_property_func_runner import NodePropertyFuncRunner
class UtilProcRunner(UncallableNamespace, IllegalAttrChecker):
def asNode(self, node_id: int) -> Any:
"""
Get a node from a node id.
Args:
node_id: The id of the node to get.
Returns:
The node with the given id.
"""
self._namespace += ".asNode"
# TODO use call_function?
result = self._query_runner.run_cypher(f"RETURN {self._namespace}({node_id}) AS node")
return result.iat[0, 0]
def asNodes(self, node_ids: list[int]) -> list[Any]:
"""
Get a list of nodes from a list of node ids.
Args:
node_ids: The ids of the nodes to get.
Returns:
The nodes with the given ids.
"""
self._namespace += ".asNodes"
# TODO use call_function?
result = self._query_runner.run_cypher(f"RETURN {self._namespace}({node_ids}) AS nodes")
return result.iat[0, 0] # type: ignore
@property
def nodeProperty(self) -> NodePropertyFuncRunner:
return NodePropertyFuncRunner(self._query_runner, self._namespace + ".nodeProperty", self._server_version)