Source code for sumolib.Dependencies

"""Database file handling.
"""

# pylint: disable=C0103
#                          Invalid name for type variable

import sys
import copy

if __name__ == "__main__":
    # if this module is directly called like a script, we have to add the path
    # ".." to the python search path in order to find modules named
    # "sumolib.[module]".
    sys.path.append("..")

import sumolib.ModuleSpec
import sumolib.JSON
import sumolib.utils

__version__="2.8.3" #VERSION#

assert __version__==sumolib.ModuleSpec.__version__
assert __version__==sumolib.JSON.__version__
assert __version__==sumolib.utils.__version__

# -----------------------------------------------
# utilities
# -----------------------------------------------

def _uq(s):
    """remove quotes from a string."""
    if len(s)<2:
        return s
    if s[0]=="'" or s[0]=='"':
        return s[1:-1]
    return s

# -----------------------------------------------
# class definitions
# -----------------------------------------------

[docs]class OldDB(sumolib.JSON.Container): """convert the old dependency database to the new format. returns a BuildCache and a Dependency object. """ def __init__(self, dict_= None, lock_timeout= None): """create the object.""" super(OldDB, self).__init__(dict_, lock_timeout)
[docs] def convert(self): """convert to a Dependencies and BuildCache object. """ new= {} for (modulename, moduledict) in self.datadict().items(): new_moduledict= new.setdefault(modulename, {}) for (versionname, versiondict) in moduledict.items(): new_versiondict= new_moduledict.setdefault(versionname, {}) for (propertyname, proptertydict) in versiondict.items(): if propertyname=="archs": continue new_versiondict[propertyname]= proptertydict return DB(new)
[docs]class DB(sumolib.JSON.Container): """the dependency database.""" # pylint: disable=R0904 # Too many public methods # pylint: disable=R0913 # Too many arguments
[docs] def selfcheck(self, msg): """raise exception if obj doesn't look like a dependency database.""" def _somevalue(d): """return kind of arbitrary value of a dict.""" keys= d.keys() key= keys[len(keys)//2] return d[key] while True: d= self.datadict() if not isinstance(d, dict): break module= _somevalue(d) if not isinstance(module, dict): break version= _somevalue(module) if not isinstance(version, dict): break deps= version.get("dependencies") if deps is not None: if not isinstance(deps, list): break src= version.get("source") if not src: break return raise ValueError("Error, dependency data is invalid %s" % msg)
def __init__(self, dict_= None, lock_timeout= None): """create the object.""" super(DB, self).__init__(dict_, lock_timeout)
[docs] def merge(self, other): """merge another Dependencies object to self. parameters: self - the object itself other - the other Dependencies object """ # pylint: disable=R0912 # Too many branches for modulename in other.iter_modulenames(): m= self.datadict().setdefault(modulename,{}) # iterate on stable, testing and unstable versions: for versionname in other.iter_versions(modulename): vdict = m.setdefault(versionname,{}) vdict2= other.datadict()[modulename][versionname] for dictname, dictval in vdict2.items(): if dictname=="weight": # take the weight from the new dict if present vdict[dictname]= dictval continue if dictname=="aliases": try: sumolib.utils.dict_update(\ vdict.setdefault(dictname,{}), dictval) except ValueError, e: raise ValueError(\ "module %s version %s aliases: %s" % \ (modulename, versionname, str(e))) continue if dictname=="source": if not vdict.has_key(dictname): vdict[dictname]= vdict2[dictname] continue if vdict[dictname]!=vdict2[dictname]: raise ValueError( "module %s version %s source: " "contradiction %s %s" % \ (modulename, versionname, repr(vdict[dictname]), repr(vdict2[dictname]))) continue if dictname=="dependencies": if not vdict.has_key(dictname): vdict[dictname]= sorted(vdict2[dictname]) continue if set(vdict[dictname])!=set(vdict2[dictname]): raise ValueError( "module %s version %s dependencies: " "contradiction %s %s" % \ (modulename, versionname, repr(vdict[dictname]), repr(vdict2[dictname]))) continue raise AssertionError("Error, unexpected dictname '%s'" % \ dictname)
[docs] def import_module(self, other, module_name, versionname): """copy the module data from another Dependencies object. This does a deepcopy of the data. """ m= self.datadict().setdefault(module_name,{}) m[versionname]= copy.deepcopy(\ other.datadict()[module_name][versionname])
[docs] def set_source_spec(self, module_name, versionname, sourcespec): """set sourcespec of a module, creates if it does not yet exist. returns True if the spec was changed, False if it wasn't. """ if not isinstance(sourcespec, sumolib.repos.SourceSpec): raise TypeError("Error, sourcespec '%s' is of wrong " "type" % repr(sourcespec)) version_dict= self.datadict().setdefault(module_name,{}) version= version_dict.setdefault(versionname, {}) old_source= version.get("source") if old_source is None: version["source"]= sourcespec.to_dict() return True old_spec= sumolib.repos.SourceSpec(old_source) return old_spec.change_source(sourcespec)
[docs] def set_source_spec_by_tag(self, module_name, versionname, tag): """try to change sourcespec by providing a tag. returns True if the spec was changed, False if it wasn't. """ version_dict= self.datadict().setdefault(module_name,{}) version= version_dict.setdefault(versionname, {}) old_source= version.get("source") if old_source is None: raise ValueError("Error, %s:%s source specification is empty, " "cannot simply change the tag." % \ (module_name, versionname)) old_spec= sumolib.repos.SourceSpec(old_source) try: ret= old_spec.change_source_by_tag(tag) except ValueError, e: raise ValueError("Error, '%s:%s' %s" % \ (module_name, versionname, _uq(str(e)))) version["source"]= old_spec.to_dict() return ret
[docs] def set_source_(self, module_name, versionname, sourcespec): """add a module with source spec.""" if not isinstance(sourcespec, sumolib.repos.SourceSpec): raise TypeError("Error, sourcespec '%s' is of wrong " "type" % repr(sourcespec)) version_dict= self.datadict().setdefault(module_name,{}) version= version_dict.setdefault(versionname, {}) version["source"]= sourcespec.to_dict()
[docs] def add_dependency(self, modulename, versionname, dep_modulename): """add dependency for a module:version. """ m_dict= self.datadict()[modulename] dep_list= m_dict[versionname].setdefault("dependencies",[]) dep_list.append(dep_modulename) dep_list.sort()
[docs] def del_dependency(self, modulename, versionname, dep_modulename): """delete dependency for a module:version if it exists. """ m_dict= self.datadict()[modulename] dep_list= m_dict[versionname].get("dependencies") if dep_list is None: raise ValueError("Error, '%s:%s' has no dependencies" % \ (modulename, versionname)) dep_set= set(dep_list) if not dep_modulename in dep_set: raise ValueError("Error, '%s:%s' doesn't depend on %s" % \ (modulename, versionname, dep_modulename)) dep_set.discard(dep_modulename) if not dep_set: # "dependencies" is now empty, remove it: del m_dict[versionname]["dependencies"] else: m_dict[versionname]["dependencies"]= sorted(list(dep_set))
[docs] def check(self): """do a consistency check on the db.""" msg= [] for modulename in self.iter_modulenames(): for versionname in self.iter_versions(modulename): for dep_modulename in self.iter_dependencies(modulename, versionname): try: self.assert_module(dep_modulename, None) except KeyError, e: msg.append("%s:%s: dependencies: %s" % \ (modulename, versionname, str(e))) return msg
[docs] def search_modules(self, rx_object): """search module names and source URLS for a regexp. Returns a list of tuples (modulename, versionname). """ results= [] for modulename in self.iter_modulenames(): if rx_object.search(modulename): for versionname in self.iter_versions(modulename): results.append((modulename, versionname)) continue for versionname in self.iter_versions(modulename): repr_st= self.module_source_string(modulename, versionname) if rx_object.search(repr_st): results.append((modulename, versionname)) return sorted(results)
[docs] def add_alias(self, modulename, versionname, alias_name, real_name): """add an alias for modulename:versionname.""" m_dict= self.datadict()[modulename] alias_dict= m_dict[versionname].setdefault("aliases",{}) if alias_dict.has_key(real_name): if alias_dict[real_name]==alias_name: return raise ValueError(\ "alias \"%s\" defined with two different names" % alias_name) alias_dict[real_name]= alias_name
[docs] def get_alias(self, modulename, versionname, dep_modulename): """return the alias or the original name for dep_modulename.""" a_dict= self.datadict()[modulename][versionname].get("aliases") if a_dict is None: return dep_modulename return a_dict.get(dep_modulename, dep_modulename)
[docs] def weight(self, modulename, versionname, new_weight= None): """set the weight factor.""" if new_weight is None: return self.datadict()[modulename][versionname].get("weight", 0) if not isinstance(new_weight, int): raise TypeError("Error, %s is not an integer" % repr(new_weight)) self.datadict()[modulename][versionname]["weight"]= new_weight
[docs] def assert_module(self, modulename, versionname): """do nothing if the module is found, raise KeyError otherwise. If versionname is None, just check that the modulename is known. """ d= self.datadict().get(modulename) if d is None: raise KeyError("Error, no data for module '%s'" % modulename) if versionname is None: return v= d.get(versionname) if v is None: raise KeyError("Error, version '%s' not found for module '%s'" % \ (versionname, modulename))
[docs] def dependencies_found(self, modulename, versionname): """returns True if dependencies are found for modulename:versionname. """ # may raise KeyError exception in this line: d= self.datadict()[modulename][versionname] return d.has_key("dependencies")
[docs] def module_source_dict(self, modulename, versionname): """return a dict {type:dict} for the module source.""" return self.datadict()[modulename][versionname]["source"]
[docs] def module_source_pair(self, modulename, versionname): """return a tuple (type,dict) for the module source.""" l= self.datadict()[modulename][versionname]["source"] return sumolib.utils.single_key_item(l)
[docs] def module_source_string(self, modulename, versionname): """return the source url or tar-file or path for a module.""" (tp,val)= self.module_source_pair(modulename, versionname) if tp not in sumolib.repos.known_sources: raise AssertionError("unexpected source tag '%s' at '%s:%s'" % \ (tp, modulename, versionname)) # due to variety of source specs, just return the "repr" string here: return repr(val)
[docs] def iter_dependencies(self, modulename, versionname): """return an iterator on dependency modulenames of a module.""" md= self.datadict().get(modulename) if md is None: raise KeyError("Error, module '%s' not found in dependency " "database" % modulename) d= md.get(versionname) if d is None: raise KeyError("Error, module '%s:%s' not found in dependency " "database" % (modulename,versionname)) d= self.datadict()[modulename][versionname] deps= d.get("dependencies") if deps is None: return iter([]) return iter(deps)
[docs] def depends_on_module(self, modulename, versionname, dependencyname): """returns True if given dependency is found. """ d= self.datadict()[modulename][versionname] deps= d.get("dependencies") if deps is None: return False return dependencyname in deps
[docs] def assert_complete_modulelist(self, moduledict): """test if a set of modules is complete. This means that all dependencies are part of the given modules. moduledict is a dictionary mapping modulename-->versionname. """ missing= set() for modulename,versionname in moduledict.items(): for dep in self.iter_dependencies(modulename, versionname): if not moduledict.has_key(dep): missing.add(dep) if missing: raise ValueError("Error, set of modules is incomplete, these " "modules are missing: %s" % (" ".join(missing)))
[docs] def sortby_weight(self, moduleversions, reverse= False): """sorts modules by weight. Order in a way that smaller weights come first. moduleversions: a list of pairs (modulename, versionname). """ local_weights= {} i=0 for m in moduleversions: local_weights[m]= i if reverse: i-=1 else: i+= 1 # now a list of tuples (weight,localweight,moduletuple) can easily be # sorted: sort_list= sorted([(self.weight(m[0],m[1]), local_weights[m], m) \ for m in moduleversions], reverse= reverse) # created a list of moduletuples from the result: return [m for (_,_,m) in sort_list]
[docs] def sortby_dependency(self, moduleversions, reverse= False): """sorts modules by dependencies. Order that dependent modules come *after* the modules they depend on. moduleversions: a list of pairs (modulename, versionname). """ # pylint: disable=R0914 # Too many local variables # pylint: disable=R0912 # Too many branches module_version_dict= dict(moduleversions) dependencies= {} weights= {} i=0 for m in moduleversions: weights[m]= i i+= 1 # collect all direct dependencies: modules_set= set(moduleversions) # dependencies will map (modulename,versionname)->set(deps) # with each dep: (dep_name, dep_version) # we also include indirect dependencies, but only those that # are part of the given moduleversions parameter. test_modules_set= modules_set while test_modules_set: new_modules_set= set() for (modulename, versionname) in test_modules_set: s= dependencies.setdefault((modulename,versionname), set()) for dep_name in self.iter_dependencies(modulename, versionname): # ignore dependencies that are not part of the given # moduleversions parameter: if not module_version_dict.has_key(dep_name): continue dep_version= module_version_dict[dep_name] s.add((dep_name,dep_version)) new_modules_set.add((dep_name, dep_version)) test_modules_set= new_modules_set # ensure that the "weight" of a module is always bigger than the # biggest weight of any of it's dependencies: changes= True while changes: changes= False for m in modules_set: deps= dependencies[m] if not deps: continue maxweight= max([weights[mod] for mod in deps]) if maxweight>= weights[m]: changes= True weights[m]= maxweight+1 # now a list of tuples (weight,moduletuple) can easily be sorted: sort_list= sorted([(weights[m],m) for m in modules_set], reverse= reverse) # created a list of moduletuples from the result: return [m for (_,m) in sort_list]
[docs] def iter_modulenames(self): """return an iterator on module names.""" return self.datadict().iterkeys()
[docs] def iter_versions(self, modulename): """return an iterator on versionnames of a module. """ for versionname, _ in self.datadict()[modulename].iteritems(): yield versionname
[docs] def sorted_moduleversions(self, modulename): """return an iterator on sorted versionnames of a module.""" return sorted(self.iter_versions(modulename), key= sumolib.utils.rev2key, reverse= True)
[docs] def patch_version(self, modulename, versionname, newversionname, do_replace): """add a new version to the database by copying the old one. do_replace: if True, replace the old version with the new one """ # pylint: disable=R0912 # Too many branches moduledata= self.datadict().get(modulename) if moduledata is None: raise ValueError("Error, module with name '%s' not found " "in dependency database" % modulename) if moduledata.has_key(newversionname): raise ValueError("Error, module %s: version %s already exists" % \ (modulename, newversionname)) d= copy.deepcopy(self.datadict()[modulename][versionname]) moduledata[newversionname]= d if do_replace: del moduledata[versionname]
[docs] def clonemodule(self, old_modulename, modulename, versions): """Take all versions of old_modulename to create modulename. """ old_moduledata= self.datadict()[old_modulename] if not versions: versions= list(self.iter_versions(old_modulename)) if self.datadict().has_key(modulename): raise ValueError("Error, module '%s' already exists" % \ modulename) m= self.datadict().setdefault(modulename,{}) for version in versions: m[version]= copy.deepcopy(old_moduledata[version])
[docs] def partial_copy_by_list(self, list_): """take items from the Dependencies object and create a new one. List must be a list of tuples in the form (modulename,versionname). This function copies modules whose versionname match *exactly* the given name, so "R1-3" and "1-3" are treated to be different. Note that the new Dependencies object only contains references of the data. This DOES NOT do a deep copy. """ new= self.__class__() for modulename, versionname in list_: d= new.datadict().setdefault(modulename, {}) # scan stable, testing and unstable versions: for version in self.iter_versions(modulename): if not sumolib.ModuleSpec.Spec.compare_versions(version,\ versionname, "eq"): continue d[version]= self.datadict()[modulename][version] return new
[docs] def partial_copy_by_modulespecs(self, modulespecs): """take items from the Dependencies object and create a new one. modulespecs must be a sumolib.ModuleSpec.Specs object. Note that this function treats versions like "R1-3" and "1-3" to be different. If no versions are defined for a module, take all versions. When no moduleversions are found, rause a ValueError exception. Note that the new Dependencies object only contains references of the data. This DOES NOT do a deep copy so you should NOT modify the result. """ if not isinstance(modulespecs, sumolib.ModuleSpec.Specs): raise TypeError("wrong type: '%s'" % repr(modulespecs)) new= self.__class__() for modulespec in modulespecs: modulename= modulespec.modulename d= new.datadict().setdefault(modulename, {}) # scan stable, testing and unstable versions: for version in self.iter_versions(modulename): if not modulespec.test(version): continue d[version]= self.datadict()[modulename][version] return new
[docs] def sets_dict(self, modulespecs): """create a dict of sets according to modulespecs. modulespecs must be a sumolib.ModuleSpec.Specs object. convert modulespecs to a sets dict:: { modulename1 : set(version1,version2), modulename2 : set(version1,version2), } """ if not isinstance(modulespecs, sumolib.ModuleSpec.Specs): raise TypeError("wrong type: '%s'" % repr(modulespecs)) new= {} for modulespec in modulespecs: modulename= modulespec.modulename s= new.setdefault(modulename, set()) found= False try: versions= list(self.iter_versions(modulename)) except KeyError, _: raise KeyError("Error, module '%s' not found in " "dependency database" % modulename) for version in versions: if not modulespec.test(version): continue found= True s.add(version) if not found: raise ValueError("Error, no data found in dependency " "database for module specification '%s'" % \ modulespec.to_string()) return new
[docs] def complete_sets_dict(self, sets_dict): """makes a sets_dict complete with respect to dependencies. A sets dict has this form: convert modulespecs to a sets dict:: { modulename1 : set(version1,version2), modulename2 : set(version1,version2), } For each dependency that is missing, this program creates a new entry in the sets dict which contains all possible versions for the missing module. Returns a set of modulenames of added dependencies. """ modules_added= set() modlist= sets_dict.keys() while modlist: new_modlist= [] for modulename in modlist: for versionname in sets_dict[modulename]: for dep_name in self.iter_dependencies(modulename, versionname): if not sets_dict.has_key(dep_name): modules_added.add(dep_name) sets_dict[dep_name]= \ set(self.iter_versions(dep_name)) new_modlist.append(dep_name) modlist= new_modlist return modules_added
[docs] def remove_missing_deps(self): """remove dependencies that are not part of the database.""" modules= set(self.iter_modulenames()) for modulename in self.iter_modulenames(): for versionname in self.iter_versions(modulename): if not self.dependencies_found(modulename, versionname): continue deletions= [] for dep_name in self.iter_dependencies(modulename, versionname): if dep_name not in modules: deletions.append(dep_name) for dep_name in deletions: try: self.del_dependency(modulename, versionname, dep_name) except ValueError, _: pass

Table Of Contents