Skip to content
Snippets Groups Projects

V0.3

Merged mehtank requested to merge mehtank/rocolib:v0.3 into v0.3
Compare and
19 files
+ 694
845
Compare changes
  • Side-by-side
  • Inline
Files
19
@@ -7,12 +7,14 @@ import networkx as nx
from circlify import circlify
from rocolib import ROCOLIB_DIR
from rocolib.api.composables.ComponentComposable import ComponentComposable
from rocolib.api.Parameterized import Parameterized
from rocolib.api.Function import YamlFunction
from rocolib.utils.utils import prefix as prefixString
from rocolib.utils.utils import tryImport
from rocolib.utils.io import load_yaml
from rocolib.utils.nx2go import GraphVisualization as gv
from dash import Dash, html
log = logging.getLogger(__name__)
@@ -35,21 +37,23 @@ def getSubcomponentObject(component, name=None):
class Component(Parameterized):
@classmethod
def test(cls, params=None, **kwargs):
def test(cls, params=None, outputs=(), filedir=None, **kwargs):
c = cls()
if params:
for key, val in params.items():
c.setParameter(key, val)
filedir = kwargs.pop('filedir', None)
c.makeOutput(filedir, **kwargs)
c.makeOutput(outputs, filedir, **kwargs)
def __init__(self, yamlFile=None):
self._name = None
self.cName = type(self).__name__
Parameterized.__init__(self)
self.reset()
yf = yamlFile
if not yamlFile:
yf = type(self).__name__ + ".yaml"
else:
self.cName = yf
for fn in (yf,
os.path.join(ROCOLIB_DIR, yf),
os.path.join(ROCOLIB_LIBRARY, yf)):
@@ -60,6 +64,7 @@ class Component(Parameterized):
pass
if yamlFile:
raise ValueError(f"No suitable yamlfile found for {yamlFile}")
self.composables['component'] = ComponentComposable(self)
self.predefine()
self.define()
@@ -380,8 +385,8 @@ class Component(Parameterized):
### Override to combine components' drawings to final drawing
pass
def append(self, name, prefix, **kwargs):
component = self.getSubcomponent(name)
def append(self, prefix, **kwargs):
component = self.getSubcomponent(prefix)
allPorts = set()
for key in component.interfaces:
@@ -390,27 +395,40 @@ class Component(Parameterized):
except TypeError:
# interface is not iterable, i.e. a single port
allPorts.add(component.getInterface(key))
for port in allPorts:
port.prefix(prefix)
for (key, composable) in component.composables.items():
if key not in self.composables:
self.composables[key] = composable.new()
self.composables[key].append(composable, prefix, **kwargs)
def attach(self, interface_1, interface_2, kwargs):
(fromName, fromPort) = interface_1
(toName, toPort) = interface_2
for (key, composable) in self.composables.items():
for port in allPorts:
port.update(component, composable, self.composables[key], prefix)
def attach(self, from_interface, to_interface, kwargs):
ifrom = self.getInterfaces(*from_interface)
ito = self.getInterfaces(*to_interface)
# Interfaces can contain multiple ports, so try each pair of ports
if not isinstance(ifrom, (list, tuple)):
ifrom = [ifrom]
if not isinstance(ito, (list, tuple)):
ito = [ito]
if len(ifrom) != len(ito):
raise AttributeError(f"Number of ports don't match connecting interface {from_interface} to {to_interface}")
for (fromPort, toPort) in zip(ifrom, ito):
try:
composable.attachInterfaces(self.getInterfaces(fromName, fromPort),
self.getInterfaces(toName, toPort),
kwargs)
fromPort.attachTo(toPort, **kwargs)
toPort.attachFrom(fromPort, **kwargs)
for (key, composable) in self.composables.items():
composable.attach(fromPort, toPort, **kwargs)
except:
logstr = "Error in attach: \n"
logstr += f" from ({fromName}, {fromPort}): "
logstr += self.getInterfaces(fromName, fromPort).toString()
logstr += f" from {from_interface}: "
logstr += fromPort.toString()
logstr += "\n"
logstr += f" to ({toName}, {toPort}): "
logstr += self.getInterfaces(toName, toPort).toString()
logstr += f" to {to_interface}: "
logstr += toPort.toString()
log.error(logstr)
raise
@@ -478,10 +496,7 @@ class Component(Parameterized):
try:
obj.make(useDefaultParameters)
for (key, composable) in obj.composables.items():
if key not in self.composables:
self.composables[key] = composable.new()
self.append(name, name, **kwargs)
self.append(name, **kwargs)
except:
log.error("Error in subclass %s, instance %s" % (classname, name))
raise
@@ -496,17 +511,6 @@ class Component(Parameterized):
(toComponent, toPort),
kwargs)
def informComposables(self):
# Let composables know what components and interfaces exist
# TODO remove this when we have a better way of letting composables
# know about components that have no ports (ex Bluetooth module driver)
for (key, composable) in self.composables.items():
for (name, sc) in self.subcomponents.items():
composable.addComponent(sc['object'])
for (name, value) in self.interfaces.items():
if value is not None:
composable.addInterface(self.getInterface(name))
# set useDefaultParameters = False to replace unset parameters with sympy variables
def make(self, useDefaultParameters=True):
if useDefaultParameters:
@@ -519,92 +523,52 @@ class Component(Parameterized):
self.evalComponent(scName, useDefaultParameters) # Merge composables from all subcomponents and tell them my components exist
self.evalConnections(scName) # Tell composables which interfaces are connected
self.informComposables() # Tell composables that my interfaces exist
self.assemble()
###
# OUTPUT PHASE
###
def makeComponentMap(self, scd=None, full=None, idnum = 0):
if scd is None:
g = self.componentGraph()
full = nx.DiGraph()
full.add_node(0, name="", component=self.getName())
title = f"<b>{self.getName()}</b>"
else:
g = scd['graph']
title = f"<b>{scd['component']}</b><br>{scd['name']}"
data = dict(id=idnum,
datum=(g.graph['depth']+1)**2,
children=[dict(id=len(full), datum=0.5)])
full.add_node(len(full), title=title)
if g:
f = nx.convert_node_labels_to_integers(g, len(full))
full = nx.disjoint_union(full, f)
for sc, scd in f.nodes(data=True):
full, cd = self.makeComponentMap(scd, full, sc)
data["children"].append(cd)
return full, data
def drawComponentTree(self, basename, stub):
g, data = self.makeComponentMap()
circles = circlify([data])
tree = gv(g,
pos = {c.ex['id']: (c.x, c.y) for c in circles},
node_size = {c.ex['id']: c.r for c in circles},
node_text = lambda k, v : v.get("title", ""),
node_border_width = lambda k, v: "title" not in v and 2 or 0,
edge_width = 0,
)
fig = tree.create_figure(width=1024, height=768)
if basename:
fig.write_image(basename+stub)
ret = basename+stub
def visualize(self, outputs=True, **ka):
widgets = self.makeOutput(outputs, widgets=True, **ka)
app = Dash(__name__)
elts = [
html.H2('RoCo component visualizer'),
html.H1(f"Component: {self.getName()}"),
]
for c, w in widgets.items():
elts.append(html.H3(f"Composable: {c}"))
for k, v in w.items():
elts.append(html.H4(f"Widget: {k}"))
elts.append(html.P(v['desc']))
elts.append(v['widget'])
app.layout = html.Div(elts)
app.run_server(debug=True)
def makeOutput(self, outputs=(), filedir=None, widgets=False, **ka):
if filedir:
log.info(f"Compiling robot designs to directory {filedir} ...")
elif widgets:
log.info(f"Generating visualization ...")
else:
ret = fig.to_image(format="png")
return ret
def makeOutput(self, filedir=".", **kwargs):
log.info(f"Compiling robot designs to directory {filedir} ...")
def kw(arg, default=kwargs.get("default", True)):
return kwargs.get(arg, default)
log.info(f"Compiling robot design dictionary ...")
if kw("remake", True):
self.make(kw("useDefaultParameters", True))
if ka.pop("remake", True):
self.make(ka.pop("useDefaultParameters", True))
log.debug(f"... done making {self.getName()}.")
# XXX: Is this the right way to do it?
import os
try:
os.makedirs(filedir)
filedir and os.makedirs(filedir)
except:
pass
# Process composables in some ordering based on type
orderedTypes = ['electrical', 'ui', 'code'] # 'code' needs to know about pins chosen by 'electrical', and 'code' needs to know about IDs assigned by 'ui'
# First call makeOutput on the ones of a type whose order is specified
rets = {}
for composableType in orderedTypes:
if composableType in self.composables:
kwargs["name"] = composableType
ss = self.composables[composableType].makeOutput(filedir, **kwargs)
rets[composableType] = ss
# Now call makeOutput on the ones whose type did not care about order
for (composableType, composable) in self.composables.items():
if composableType not in orderedTypes:
kwargs["name"] = composableType
ss = self.composables[composableType].makeOutput(filedir, **kwargs)
rets[composableType] = ss
if kw("tree"):
log.info("Generating hierarchy tree...")
rets["component"] = self.drawComponentTree(filedir, "/tree.png")
log.debug("done making tree.")
for (name, composable) in self.composables.items():
ss = composable.makeOutput(name, outputs, filedir, widgets, **ka)
rets[name] = ss
log.info("Happy roboting!")
return rets