######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Lib/ImportUtil.py,v 1.1.2.1 2006/09/24 20:59:35 jkloth Exp $ """ Utilites for working with Python PEP 302 import hooks. Copyright 2006 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """ from __future__ import generators import os import sys import imp import time import types import cStringIO try: from zipimport import zipimporter except ImportError: # placeholder for Python 2.2 to simplify code paths class zipimporter(object): pass __all__ = [ # Module Utilities 'FindLoader', 'FindImporter', 'GetImporter', 'IterModules', 'GetLastModified', 'GetSearchPath', # Resource Utilities 'OsPathToResource', 'NormalizeResource', 'GetResourceFilename', 'GetResourceString', 'GetResourceStream', 'GetResourceLastModified', ] # Indicate that the use of "special" names is handled in a "zip-safe" way. __zipsafe__ = True IMP_SEARCH_ORDER = [ desc[0] for desc in imp.get_suffixes() ] # ZIP imports always search for .pyc AND .pyo, but reverse their order # depending on the optimzation flag (-O). ZIP_SEARCH_ORDER = [ '.py', '.pyc', '.pyo'] if not __debug__: ZIP_SEARCH_ORDER.remove('.pyc') ZIP_SEARCH_ORDER.append('.pyc') try: # New in Python 2.5 (or setuptools 0.7+) from pkgutil import ImpImporter, ImpLoader, iter_importers, get_loader, \ find_loader, iter_modules, get_importer except ImportError: import marshal, inspect, re MODULE_TYPE_INFO = {} for suffix, mode, module_type in imp.get_suffixes(): MODULE_TYPE_INFO[module_type] = (suffix, mode) class ImpImporter: """PEP 302 Importer that wraps Python's "classic" import algorithm ImpImporter(dirname) produces a PEP 302 importer that searches that directory. ImpImporter(None) produces a PEP 302 importer that searches the current sys.path, plus any modules that are frozen or built-in. Note that ImpImporter does not currently support being used by placement on sys.meta_path. """ def __init__(self, path=None): if path is not None and not os.path.isdir(path): raise ImportError("not a directory") self.path = path def __repr__(self): return "" % self.path def find_module(self, fullname, path=None): # Note: we ignore 'path' argument since it is only used via # meta_path subname = fullname.split(".")[-1] if subname != fullname and self.path is None: return None if self.path is None: path = None else: path = [os.path.realpath(self.path)] try: file, filename, etc = imp.find_module(subname, path) except ImportError: return None return ImpLoader(fullname, file, filename, etc) def iter_modules(self, prefix=''): if self.path is None or not os.path.isdir(self.path): return yielded = {} filenames = os.listdir(self.path) filenames.sort() # handle packages before same-named modules for fn in filenames: modname = inspect.getmodulename(fn) if modname == '__init__' or modname in yielded: continue path = os.path.join(self.path, fn) ispkg = False if not modname and os.path.isdir(path) and '.' not in fn: modname = fn for fn in os.listdir(path): subname = inspect.getmodulename(fn) if subname == '__init__': ispkg = True break else: continue # not a package if modname and '.' not in modname: yielded[modname] = 1 yield prefix + modname, ispkg return class ImpLoader: """PEP 302 Loader that wraps Python's "classic" import algorithm """ code = None source = None def __init__(self, fullname, file, filename, etc): self.file = file self.filename = filename self.fullname = fullname self.etc = etc def load_module(self, fullname): self._reopen() try: mod = imp.load_module(fullname, self.file, self.filename, self.etc) finally: self.file.close() # Note: we don't set __loader__ because we want the module to # look normal; i.e. this is just a wrapper for standard import # machinery return mod def get_data(self, pathname): f = open(pathname, 'rb') try: data = f.read() finally: f.close() return data def _reopen(self): if self.file and self.file.closed: suffix, mode, module_type = self.etc self.file = open(self.filename, mode) return def _fix_name(self, fullname): if fullname is None: fullname = self.fullname elif fullname != self.fullname: raise ImportError("Loader for module %s cannot handle " "module %s" % (self.fullname, fullname)) return fullname def _get_package_loader(self): return ImpImporter(self.filename).find_module('__init__') def is_package(self, fullname): fullname = self._fix_name(fullname) return self.etc[2] == imp.PKG_DIRECTORY def get_code(self, fullname=None): fullname = self._fix_name(fullname) if self.code is None: module_type = self.etc[2] if module_type == imp.PY_SOURCE: source = self.get_source(fullname) self.code = compile(source, self.filename, 'exec') elif module_type == imp.PY_COMPILED: self._reopen() try: magic = self.file.read(4) if magic == imp.get_magic(): timestamp = self.file.read(4) self.code = marshal.load(self.file) finally: self.file.close() elif module_type == imp.PKG_DIRECTORY: self.code = self._get_package_loader().get_code() return self.code def get_source(self, fullname=None): fullname = self._fix_name(fullname) if self.source is None: module_type = self.etc[2] if module_type == imp.PY_SOURCE: self._reopen() try: self.source = self.file.read() finally: self.file.close() elif module_type == imp.PY_COMPILED: suffix, mode = MODULE_TYPE_INFO[imp.PY_COMPILED] filename = os.path.splitext(self.filename)[0] + suffix if os.path.exists(filename): f = open(filename, mode) try: self.source = f.read() finally: f.close() elif module_type == imp.PKG_DIRECTORY: self.source = self._get_package_loader().get_source() return self.source def get_filename(self, fullname=None): fullname = self._fix_name(fullname) module_type = self.etc[2] if module_type == imp.PKG_DIRECTORY: return self._get_package_loader().get_filename() elif module_type in MODULE_TYPE_INFO: return self.filename return None def get_importer(path_item): """Retrieve a PEP 302 importer for the given path item """ if sys.version < '2.3': importer = None else: try: importer = sys.path_importer_cache[path_item] except KeyError: for path_hook in sys.path_hooks: try: importer = path_hook(path_item) break except ImportError: pass else: importer = None sys.path_importer_cache.setdefault(path_item, importer) # The boolean values are used for caching valid and invalid # file paths for the built-in import machinery if importer in (None, True, False): try: importer = ImpImporter(path_item) except ImportError: importer = None return importer def iter_importers(fullname=''): if '.' in fullname: # Get the containing package's __path__ pkg = '.'.join(fullname.split('.')[:-1]) if pkg not in sys.modules: __import__(pkg) path = sys.modules[pkg].__path__ else: # sys.meta_path is available in Python 2.3+ for importer in getattr(sys, 'meta_path', []): yield importer path = sys.path for item in path: yield get_importer(item) if '.' not in fullname: yield ImpImporter() def get_loader(module_or_name): """Get a PEP 302 "loader" object for module_or_name If the module or package is accessible via the normal import mechanism, a wrapper around the relevant part of that machinery is returned. Returns None if the module cannot be found or imported. If the named module is not already imported, its containing package (if any) is imported, in order to establish the package __path__. """ if module_or_name in sys.modules: module_or_name = sys.modules[module_or_name] if isinstance(module_or_name, types.ModuleType): module = module_or_name loader = getattr(module, '__loader__', None) if loader is not None: return loader fullname = module.__name__ else: fullname = module_or_name return find_loader(fullname) def find_loader(fullname): """Find a PEP 302 "loader" object for fullname If fullname contains dots, path must be the containing package's __path__. Returns None if the module cannot be found or imported. """ for importer in iter_importers(fullname): loader = importer.find_module(fullname) if loader is not None: return loader return None def iter_zipimport_modules(importer, prefix): # make the path components regex safe sep = os.sep.replace('\\', '\\\\') path = prefix.replace(os.sep, sep) # using "non-greedy" matching in case a suffix is not just an # extension (like module.so for dlopen imports) modname = '[a-zA-Z_][a-zA-Z0-9_]*?' pkginit = sep + '__init__' suffix = '|'.join([ desc[0] for desc in imp.get_suffixes() ]) suffix = suffix.replace('.', '\\.') pattern = '^%s(%s)(%s)?(%s)$' % (path, modname, pkginit, suffix) submodule_match = re.compile(pattern).match yielded = {} dirlist = list(importer._files) dirlist.sort() for fn in dirlist: match = submodule_match(fn) if match is not None: modname, pkginit, suffix = match.groups() if pkginit: ispkg = True elif modname == '__init__': continue else: ispkg = False if modname not in yielded: yielded[modname] = True yield modname, ispkg return def iter_modules(path=None, prefix=''): """Yield submodule names+loaders for path or sys.path""" if path is None: importers = iter_importers() else: importers = map(get_importer, path) yielded = {} for importer in importers: if hasattr(importer, 'iter_modules'): modules = importer.iter_modules(prefix) elif isinstance(importer, zipimporter): modules = iter_zipimport_modules(importer, prefix) else: modules = [] for name, ispkg in modules: if name not in yielded: yielded[name] = 1 yield importer, name, ispkg return try: from pkg_resources import get_provider, resource_filename except ImportError: class DefaultProvider: """Resource provider for "classic" loaders""" def __init__(self, module): self.loader = getattr(module, '__loader__', None) self.module_path = os.path.dirname(module.__file__) def get_resource_filename(self, manager, resource_name): return self._fn(self.module_path, resource_name) def get_resource_stream(self, manager, resource_name): return open(self._fn(self.module_path, resource_name), 'rb') def get_resource_string(self, manager, resource_name): stream = self.get_resource_stream(manager, resource_name) try: return stream.read() finally: stream.close() def has_resource(self, resource_name): return self._has(self._fn(self.module_path, resource_name)) def resource_isdir(self, resource_name): return self._isdir(self._fn(self.module_path, resource_name)) def resource_listdir(self, resource_name): return self._listdir(self._fn(self.module_path, resource_name)) def _fn(self, base, resource_name): return os.path.join(base, *resource_name.split('/')) def _has(self, pathname): return os.path.exists(pathname) def _isdir(self, pathname): return os.path.isdir(pathname) def _listdir(self, pathname): return os.listdir(pathname) class ZipProvider(DefaultProvider): """Resource provider for ZIP loaders""" _dirindex = None def __init__(self, module): DefaultProvider.__init__(self, module) self.zipinfo = self.loader._files self.zip_pre = self.loader.archive + os.sep def get_resource_filename(self, manager, resource_name): raise NotImplementedError("not supported by ZIP loaders") def get_resource_stream(self, manager, resource_name): data = self.get_resource_string(manager, resource_name) return cStringIO.StringIO(data) def get_resource_string(self, manager, resource_name): pathname = self._fn(self.module_path, resource_name) return self.loader.get_data(pathname) def _zipinfo_name(self, pathname): # Convert a virtual filename (full path to file) into a zipfile # subpath usable with the zipimport directory cache for our # target archive. if pathname.startswith(self.zip_pre): return pathname[len(self.zip_pre):] raise ValueError("%s not in %s" % (pathname, self.zip_pre)) def _build_index(self): self._dirindex = index = {} for path in self.zipinfo: parts = path.split(os.sep) while parts: parent = os.sep.join(parts[:-1]) if parent in index: index[parent].append(parts[-1]) break else: index[parent] = [parts.pop()] return index def _has(self, pathname): arcname = self._zipinfo_name(fspath) return (arcname in self.zipinfo or arcname in (self._dirindex or self._build_index())) def _isdir(self, pathname): arcname = self._zipinfo_name(pathname) return arcname in (self._dirindex or self._build_index()) def _listdir(self, pathname): arcname = self._zipinfo_name(pathname) if arcname in (self._dirindex or self._build_index()): return self._dirindex[arcname][:] return [] def get_provider(fullname): if fullname not in sys.modules: __import__(fullname) module = sys.modules[fullname] loader = getattr(module, '__loader__', None) if loader is None: provider = DefaultProvider(module) elif isinstance(loader, zipimporter): provider = ZipProvider(module) else: raise NotImplementedError('unsupported loader type: %s' % loader) return provider _resource_manager = None else: # pkg_resources (aka setuptools) installed.; the resource_filename # top-level name is actually the bound method of the global # ResourceManager (at least that is what the PkgResources docs say). _resource_manager = resource_filename.im_self del resource_filename GetImporter = get_importer FindLoader = find_loader IterModules = iter_modules def FindImporter(fullname): """Find a PEP 302 "loader" object for fullname If fullname contains dots, path must be the containing package's __path__. Returns None if the module cannot be found or imported. """ for importer in iter_importers(fullname): if importer.find_module(fullname) is not None: return importer return None def GetLastModified(fullname): """ Returns the last modified timestamp for the given module. """ loader = get_loader(fullname) if hasattr(loader, 'get_filename'): suffixes = IMP_SEARCH_ORDER elif isinstance(loader, zipimporter): suffixes = ZIP_SEARCH_ORDER else: raise NotImplementedError("unsupported loader %s" % laoder) barename = '/' + fullname.replace('.', '/') if loader.is_package(fullname): barename += '/__init__' for suffix in suffixes: resource = barename + suffix try: timestamp = GetResourceLastModified(fullname, resource) except EnvironmentError: timestamp = 0 else: break return timestamp def GetSearchPath(fullname): loader = get_loader(fullname) if loader.is_package(fullname): if fullname in sys.modules: package = sys.modules[fullname] else: package = loader.load_module(fullname) return package.__path__ return None # -- Resource Handling ------------------------------------------------ def OsPathToResource(pathname): components = [] for component in pathname.split(os.sep): if component == '..': del components[-1:] elif component not in ('', '.'): components.append(component) resource = '/'.join(components) if pathname.startswith(os.sep): resource = '/' + resource return resource def NormalizeResource(package, resource): # normalize the resource pathname # Note, posixpath is not used as it doesn't remove leading '..'s components = [] for component in resource.split('/'): if component == '..': del components[-1:] elif component not in ('', '.'): components.append(component) absolute = resource.startswith('/') resource = '/'.join(components) provider = get_provider(package) if absolute: # Find the provider for the distribution directory module_path = provider.module_path packages = package.split('.') if not get_loader(package).is_package(package): del packages[-1] for module in packages: module_path = os.path.dirname(module_path) provider.module_path = module_path return (provider, resource) def GetResourceFilename(package, resource): """Returns a true filesystem name for the specified resource. """ provider, resource = NormalizeResource(package, resource) return provider.get_resource_filename(_resource_manager, resource) def GetResourceString(package, resource): """Return a string containing the contents of the specified resource. If the pathname is absolute it is retrieved starting at the path of the importer for 'fullname'. Otherwise, it is retrieved relative to the module within the loader. """ provider, resource = NormalizeResource(package, resource) return provider.get_resource_string(_resource_manager, resource) def GetResourceStream(package, resource): """Return a readable stream for specified resource""" provider, resource = NormalizeResource(package, resource) return provider.get_resource_stream(_resource_manager, resource) def GetResourceLastModified(package, resource): """Return a timestamp indicating the last-modified time of the specified resource. Raises IOError is the pathname cannot be found from the loader for 'fullname'. """ provider, resource = NormalizeResource(package, resource) if isinstance(provider.loader, zipimporter): if not resource: # it is the archive itself timestamp = os.stat(provider.module_path).st_mtime else: filename = provider._fn(provider.module_path, resource) zipinfo_name = provider._zipinfo_name(filename) try: dostime, dosdate = provider.zipinfo[zipinfo_name][5:7] except: import errno errorcode = errno.ENOENT raise IOError(errorcode, os.strerror(errorcode), zipinfo_name) timestamp = time.mktime(( ((dosdate >> 9) & 0x7f) + 1980, # tm_year ((dosdate >> 5) & 0x0f) - 1, # tm_mon ((dosdate >> 0) & 0x1f), # tm_mday ((dostime >> 11) & 0x1f), # tm_hour ((dostime >> 5) & 0x3f), # tm_min ((dostime >> 0) & 0x1f) * 2, # tm_secs 0, 0, -1)) else: filename = provider.get_resource_filename(_resource_manager, resource) timestamp = os.stat(filename).st_mtime return timestamp