# pylint: disable=C0302
# Too many lines in module
"""Utilities for the SUMO scripts.
"""
# pylint: disable=C0322
# Operator not preceded by a space
# pylint: disable=C0103
# Invalid name for type variable
# pylint: disable=W0141
# Used builtin function ...
import sys
import os
import os.path
import re
__version__="2.8.3" #VERSION#
_pyver= (sys.version_info[0], sys.version_info[1])
if _pyver < (2,5):
sys.exit("ERROR: SUMO requires at least Python 2.5, "
"your version is %d.%d" % _pyver)
# -----------------------------------------------
# tracing support
# -----------------------------------------------
[docs]def show_progress(cnt, cnt_max, message= None):
"""show progress on stderr.
"""
if message:
sys.stderr.write("'.' for every %s %s\n" % (cnt_max, message))
cnt-= 1
if cnt<0:
sys.stderr.write(".")
sys.stderr.flush()
cnt= cnt_max
return cnt
# -----------------------------------------------
# user interaction
# -----------------------------------------------
[docs]def ask_yes_no(question, force_yes= None):
"""ask a yes or no question.
returns:
True - if the user answered "yes" or "y"
False - if the user answered "no" or "n"
"""
if force_yes is not None:
if force_yes:
return True
question+= " "
while True:
inp= raw_input(question).lower().strip()
if inp in ["yes","y"]:
return True
if inp in ["no", "n"]:
return False
print "\tplease enter 'y', 'yes, 'n' or 'no'"
question=""
[docs]def ask_abort(question, force_yes= None):
"""ask if the user wants to abort the program.
Aborts the program if the user enters "y".
"""
if force_yes is not None:
if force_yes:
return
if not ask_yes_no(question + "Enter 'y' to continue or "
"'n' to abort the program"):
sys.exit("program aborted by user request")
# -----------------------------------------------
# data structure utilities
# -----------------------------------------------
[docs]def opt_join(option, do_sort= False):
"""join command line option values to a single list.
Here is an example:
>>> a=["a b","c","d e f"]
>>> opt_join(a)
['a', 'b', 'c', 'd', 'e', 'f']
"""
if option is None:
return
lst= " ".join(option).split()
if do_sort:
lst.sort()
return lst
[docs]def dict_of_sets_add(dict_, key, val):
"""add a key, create a set if needed.
Here is an example:
>>> import pprint
>>> d= {}
>>> dict_of_sets_add(d,"a",1)
>>> dict_of_sets_add(d,"a",2)
>>> dict_of_sets_add(d,"b",1)
>>> pprint.pprint(d)
{'a': set([1, 2]), 'b': set([1])}
"""
l= dict_.get(key)
if l is None:
dict_[key]= set([val])
else:
l.add(val)
[docs]def dict_sets_to_lists(dict_):
"""change values from sets to sorted lists.
Here is an example:
>>> import pprint
>>> d= {'a': set([1, 2]), 'b': set([1])}
>>> ld= dict_sets_to_lists(d)
>>> pprint.pprint(ld)
{'a': [1, 2], 'b': [1]}
"""
new= {}
for (k,v) in dict_.items():
new[k]= sorted(list(v))
return new
# -----------------------------------------------
# file utilities
# -----------------------------------------------
[docs]def file_w_open(filename, verbose, dry_run):
"""open a file for write."""
if verbose:
print "opening file '%s'" % filename
if dry_run:
return None
return open(filename, "w")
[docs]def file_write(fh, st, verbose, dry_run):
"""write a line to a file."""
if verbose:
print st,
if not dry_run:
fh.write(st)
[docs]def mk_text_file(filename, lines, verbose, dry_run):
"""create a text file."""
if verbose:
if filename!="-":
print "creating",filename
if dry_run:
filename="-"
if filename=="-":
fh= None
# special settings for _fwrite:
verbose= True
dry_run= True
else:
fh= open(filename, "w")
for l in lines:
file_write(fh, l, verbose, dry_run)
if filename!="-":
fh.close()
# -----------------------------------------------
# directory utilities
# -----------------------------------------------
[docs]def changedir(newdir):
"""return the current dir and change to a new dir.
If newdir is empty, return <None>.
"""
if not newdir:
return
cwd= os.getcwd()
os.chdir(newdir)
return cwd
# The following is needed in order to support python2.5
# where os.walk cannot follow symbolic links
[docs]def dirwalk(start_dir):
"""walk directories, follow symbolic links.
Implemented to behave like os.walk
On Python newer than 2.5 os.walk can follow symbolic links itself.
"""
for (dirpath, dirnames, filenames) in os.walk(start_dir, topdown= True):
yield (dirpath, dirnames, filenames)
for dn in dirnames:
p= os.path.join(dirpath, dn)
if os.path.islink(p):
for (dp, dn, fn) in dirwalk(p):
yield (dp, dn, fn)
# -----------------------------------------------
# version and path support
# -----------------------------------------------
[docs]def split_treetag(path):
"""split a path into head,treetag.
A path like /opt/Epics/R3.14.8/support/BIIcsem/1-0+001
is splitted to
"/opt/Epics/R3.14.8/support/BIIcsem/1-0","001"
Here is an example:
>>> split_treetag("abc/def/1-0")
['abc/def/1-0', '']
>>> split_treetag("abc/def/1-0+001")
['abc/def/1-0', '001']
"""
l= path.rsplit("+",1)
if len(l)<2:
l.append("")
return l
[docs]def rev2key(rev):
"""convert a revision number to a comparable string.
This is needed to compare revision tags.
Examples of valid revisions:
2.3.4
2-3-4
R2-3-4
seq-2.3.4
head
trunk
Here are some examples:
>>> rev2key("R2-3-4")
'002.003.004'
>>> rev2key("2-3-4")
'002.003.004'
>>> rev2key("head")
'-head'
>>> rev2key("test")
'-test'
>>> rev2key("test")<rev2key("R2-3-4")
True
>>> rev2key("R2-3-3")<rev2key("R2-3-4")
True
>>> rev2key("R2-3-5")<rev2key("R2-3-4")
False
>>> rev2key("R2-4-3")<rev2key("R2-3-4")
False
>>> rev2key("R1-3-4")<rev2key("R2-3-4")
True
>>> rev2key("R3-3-4")<rev2key("R2-3-4")
False
"""
if rev=="":
return "-"
t= tag2version(rev)
if not t[0].isdigit():
return "-" + rev
rev= t
# allow "-" and "." as separator for numbers:
rev= rev.replace("-",".")
l= rev.split(".")
n= []
# reformat all numbers in a 3-digit form:
for e in l:
try:
n.append("%03d" % int(e))
except ValueError, _:
n.append(str(e))
return ".".join(n)
[docs]def split_path(path):
"""split a path into (base, version, treetag).
Here are some examples:
>>> split_path("abc/def/1-3+001")
['abc/def', '1-3', '001']
>>> split_path("abc/def/1-3")
['abc/def', '1-3', '']
>>> split_path("abc/def/head+002")
['abc/def', 'head', '002']
"""
(head,tail)= os.path.split(path)
(version, treetag)= split_treetag(tail)
return [head, version, treetag]
[docs]def tag2version(ver):
"""convert a tag to a version.
Here are some examples:
>>> tag2version("1-2")
'1-2'
>>> tag2version("R1-2")
'1-2'
>>> tag2version("R-1-2")
'1-2'
>>> tag2version("seq-1-2")
'1-2'
>>> tag2version("head")
'head'
>>> tag2version("")
''
"""
if len(ver)<1:
return ver
mode=0
for i in xrange(len(ver)):
if mode==0:
if ver[i].isalpha():
continue
mode=1
if mode==1:
if ver[i]=="-" or ver[i]=="_":
mode=2
continue
mode=2
if mode==2:
if ver[i].isdigit():
return ver[i:]
else:
return ver
return ver
[docs]def is_standardpath(path, revision_tag):
"""checks if path is complient to Bessy convention for support paths.
Here are some examples:
>>> is_standardpath("support/mcan/2-3", "R2-3")
True
>>> is_standardpath("support/mcan/2-3", "R2-4")
False
>>> is_standardpath("support/mcan/head", "R2-3")
False
>>> is_standardpath("support/mcan/2-3+001", "R2-3")
True
"""
l= split_path(path)
return tag2version(l[1])==tag2version(revision_tag)
# -----------------------------------------------
# generic datastructure utilities
# -----------------------------------------------
[docs]def single_key(dict_):
"""dict must have exactly one key, return it.
Raises ValueError if the dict has more than one key.
"""
keys= dict_.keys()
if len(keys)!=1:
raise ValueError("dict hasn't exactly one key: %s" % repr(keys))
return keys[0]
[docs]def single_key_item(dict_):
"""dict must have exactly one key, return it and it's value.
Raises ValueError if the dict has more than one key.
"""
k= single_key(dict_)
return (k, dict_[k])
[docs]def dict_update(mydict, other, keylist= None):
"""update mydict with other but do not change existing values.
If keylist is given, update only these keys.
"""
if keylist is None:
keylist= other.keys()
for k in keylist:
v= other[k]
old_v= mydict.get(k)
if old_v is None:
mydict[k]= v
continue
if old_v==v:
continue
raise ValueError("key %s: contradicting values: %s %s" % \
(k,repr(old_v),repr(v)))
[docs]def list_update(list1, list2):
"""update a list with another.
In the returned list each element is unique and it is sorted.
"""
if not list1:
return sorted(list2[:])
s= set(list1)
s.update(list2)
return sorted(s)
# -----------------------------------------------
# classes
# -----------------------------------------------
[docs]class RegexpMatcher(object):
"""apply one or more regexes on strings."""
def __init__(self, regexes=None):
r"""initialize from a list of regexes.
"""
self._list= []
if regexes is not None:
for rx in regexes:
self.add(rx)
[docs] def add(self, regexp):
"""add a regexp."""
#print "RX: ",repr(regexp_pair)
rx= re.compile(regexp)
self._list.append(rx)
[docs] def match(self, str_):
"""match the regular expression to a string"""
if not self._list:
return False
for rx in self._list:
if rx.match(str_):
return True
return False
[docs] def search(self, str_):
"""search the regular expression in a string"""
if not self._list:
return False
for rx in self._list:
if rx.search(str_):
return True
return False
[docs]class RegexpPatcher(object):
"""apply one or more regexes on strings."""
def __init__(self, tuples=None):
r"""initialize from a list of tuples.
Here is a simple example:
>>> rx= RegexpPatcher(((r"a(\w+)",r"a(\1)"),(r"x+",r"x")))
>>> rx.apply("ab xx")
'a(b) x'
"""
self._list= []
if tuples is not None:
for regexp_pair in tuples:
self.add(regexp_pair)
[docs] def add(self, regexp_pair):
"""add a from-regexp to-regexp pair."""
#print "RX: ",repr(regexp_pair)
rx= re.compile(regexp_pair[0])
self._list.append((rx, regexp_pair[1]))
[docs] def apply(self, str_):
"""apply the regular expressions to a string"""
if not self._list:
return str_
for (rx, repl) in self._list:
str_= rx.sub(repl, str_)
return str_
[docs]class Hints(object):
"""Combine hints for sumo-scan"""
_empty= {}
def __init__(self, specs= None):
r"""initialize from a list of specification strings.
Here is an example:
>>> h= Hints()
>>> h.add(r'\d,TAGLESS')
>>> print h.flags("ab")
{}
>>> print h.flags("ab12")
{'tagless': True}
"""
self._hints= []
if specs is not None:
for spec in specs:
self.add(spec)
[docs] def add(self, spec):
"""add a new hint."""
parts= spec.split(",")
rx= re.compile(parts[0])
d= {}
for flag in parts[1:]:
# pylint: disable=W0212
# Access to a protected member
(key,val)= self.__class__._parse_flag(flag)
d[key]= val
self._hints.append((rx, d))
@staticmethod
def _parse_flag(flag):
"""parse a flag string."""
if flag=="PATH":
return ("path", True)
if flag=="TAGLESS":
return ("tagless", True)
raise ValueError("unknown flag: %s" % flag)
[docs] def flags(self, path):
"""return the flags of a path."""
for (rx, d) in self._hints:
if rx.search(path):
return d
# pylint: disable=W0212
# Access to a protected member
return self.__class__._empty
def _test():
"""perform internal tests."""
import doctest
doctest.testmod()
if __name__ == "__main__":
_test()