3 Fixme: make this into a proper click main 8 from collections
import defaultdict
13 from wirecell
import units
15 @click.group(
"pgraph")
19 Wire Cell Signal Processing Features 25 print (
"Node(%s) with no attributes"%tn)
40 return "[%s]"%self.
name 45 Add a port of end "head" or "tail" and ident (number). 50 return self.tn.replace(
":",
"_")
54 if "head" in self.
ports:
55 head =
"{%s}" % (
"|".join([
"<in%d>%d"%(num,num)
for num
in sorted(self.
ports[
"head"])]),)
59 for k,v
in sorted(self.attrs.items()):
60 if isinstance(v,list):
62 if isinstance(v,dict):
64 one =
"%s = %s" % (k,v)
66 body =
r"\n".join(body)
70 if "tail" in self.
ports:
71 tail =
"{%s}" % (
"|".join([
"<out%d>%d"%(num,num)
for num
in sorted(self.
ports[
"tail"])]),)
74 return "{%s}" % (
"|".join(ret),)
80 return type(x)
in [list]
83 return all(map(is_string, x))
87 Return GraphViz text. If attrs is a dictionary, append to the 88 node a list of its items. 93 tn = edge[end][
"node"]
97 n =
Node(tn, **attrs.get(tn, {}))
99 p = edge[end].
get(
"port",0)
105 for edge
in edge_dat:
106 t,tp =
get(edge,
"tail")
107 h,hp =
get(edge,
"head")
108 e =
'"%s":out%d -> "%s":in%d' % (t.dot_name(),tp, h.dot_name(),hp)
112 for tn,n
in list(nodes.items()):
113 for k,v
in n.attrs.items():
121 for maybe
in tocheck:
122 if maybe
not in attrs:
125 cn = nodes.get(maybe,
None);
127 cn =
Node(maybe, **attrs.get(maybe, {}))
130 e =
'"%s" -> "%s"[style=dashed,color=gray]' % (n.dot_name(), cn.dot_name())
135 ret = [
"digraph pgraph {",
137 "\tnode[shape=record];"]
138 for nn,node
in sorted(nodes.items()):
139 ret.append(
'\t"%s"[label="%s"];' % (node.dot_name(), node.dot_label()))
141 ret.append(
"\t%s;" % e)
143 return '\n'.join(ret);
147 raise RuntimeError(
'Got invalid filename (empty string).')
151 full_path = os.path.join(path, rel)
152 if full_path[-1] ==
'/':
153 raise RuntimeError(
'Attempted to import a directory')
155 if not os.path.isfile(full_path):
156 return full_path,
None 157 with
open(full_path)
as f:
158 return full_path, f.read()
162 paths = [path] + os.environ.get(
"WIRECELL_PATH",
"").
split(
":")
169 return full_path, content
170 raise RuntimeError(
'File not found')
176 Select out a part of obj based on a "."-separated path. Any 177 element of the path that looks like an integer will be cast to 178 one assuming it indexes an array. 180 jpath = jpath.split(
'.')
194 Given a list of nodes, return a dictionary of their "data" entries 195 keyed by 'type' or 'type:name' 199 if type(one) != dict:
200 print (
type(one),one)
202 if "name" in one
and one[
'name']:
204 tn +=
":" + one[
"name"]
205 ret[tn] = one.get(
"data", {})
208 @cli.command(
"dotify")
209 @click.option(
"--jpath", default=
"",
210 help=
"A dot-delimited path into the JSON to locate a graph-like object")
211 @click.option(
"--params/--no-params", default=
True,
212 help=
"Enable/disable the inclusion of contents of configuration parameters")
213 @click.argument(
"json-file")
214 @click.argument(
"out-file")
218 Convert a JSON file for a WCT job configuration based on the 219 Pgraph app into a dot file. 221 The JSON file needs to at least contain a list of edges found at 222 the given jpath. Use, eg, "-1" to locate the last element of a 223 configuration sequence which is typically the config for a 224 Pgrapher. If indeed it is, its [jpath].data.edges attribution 225 will be located and the overall JSON data structure will be used 226 as a list of nodes. Otherwise [jpath].edges will be used and 227 [jpath].uses will be used to provide an initial list of node 230 if json_file.endswith(
".jsonnet"):
232 jtext = _jsonnet.evaluate_file(json_file, import_callback=jsonnet_import_callback)
236 dat = json.loads(jtext)
240 click.echo(
'failed to resolve path "%s" in object:\n' % (jpath))
247 if cfg.get(
"type",
"") ==
"Pgrapher":
248 print (
'Pgrapher object found at jpath: "%s" with %d nodes' % (jpath, len(dat)))
249 edges = cfg[
"data"][
"edges"]
253 uses = cfg.get(
"uses", list())
257 dtext =
dotify(edges, attrs)
258 ext = os.path.splitext(out_file)[1][1:]
259 dot =
"dot -T %s -o %s" % (ext, out_file)
260 proc = subprocess.Popen(dot, shell=
True, stdin = subprocess.PIPE)
261 proc.communicate(input=dtext.encode(
"utf-8"))
267 if '__main__' == __name__:
int open(const char *, int)
Opens a file descriptor.
Coord add(Coord c1, Coord c2)
def dotify(edge_dat, attrs)
def add_port(self, end, ident)
def resolve_path(obj, jpath)
def cmd_dotify(ctx, jpath, params, json_file, out_file)
def __init__(self, tn, attrs)
static QInternalList< QTextCodec > * all
def jsonnet_try_path(path, rel)
def jsonnet_import_callback(path, rel)
int read(int, char *, size_t)
Read bytes from a file descriptor.
void split(std::string const &s, char c, OutIter dest)
auto const & get(AssnsNode< L, R, D > const &r)
def dot_name(self, port=None)