-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpydzipimport.py
More file actions
179 lines (144 loc) · 6.17 KB
/
pydzipimport.py
File metadata and controls
179 lines (144 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""Replacement for `zipimport.zipimporter` which allows C extension
modules to be loaded.
To use::
import pydzipimport
pydzipimport.install()
Create a zip file which contains any number of modules or packages,
add the zip file to `sys.path` and import as usual.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2012 Bradley Froehle <brad.froehle@gmail.com>
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
__version__ = '0.1'
import imp
import os
import sys
import tempfile
import warnings
import zipimport
from importlib.machinery import EXTENSION_SUFFIXES
__all__ = [
'PydZipImporter',
'TemporaryExtensionFileLoader',
'install',
'uninstall',
]
def _call_with_frames_removed(f, *args, **kwds):
"""remove_importlib_frames in import.c will always remove sequences
of importlib frames that end with a call to this function
"""
return f(*args, **kwds)
class PydZipImporter(zipimport.zipimporter):
"""A ZipImporter which allows loading C extension modules.
To load a C extension module, the shared object contents are
writen to a temporary file.
"""
_extension_searchorder = \
[os.sep + '__init__' + s for s in EXTENSION_SUFFIXES] + \
EXTENSION_SUFFIXES
def _get_extension_module_info(self, fullname):
"""Get the suffix & path of the extension module by name 'fullname',
returing `None` if no extension module is found.
"""
subname = fullname.split('.')[-1]
path = self.prefix + subname.replace('.', os.sep)
for suffix in self._extension_searchorder:
fullpath = path + suffix
if fullpath in self._files:
return suffix, fullpath
# We replace the machinery used to find modules by one which first
# checks for the existence of a C extension module in the archive,
# before deferring to zipimport.zipimporter.
def is_package(self, fullname):
"""Return a bool signifying whether the module is a package or not."""
info = self._get_extension_module_info(fullname)
if info and info[0].startswith(os.sep + '__init__'):
return True
return super().is_package(fullname)
def find_module(self, fullname, path=None):
"""Check whether we can satisfy the import of the module named by
'fullname'. Return self if we can, None if we can't."""
loader, portions = self.find_loader(fullname, path)
if loader is None and len(portions):
msg = "Not importing directory {}: missing __init__"
warnings.warn(msg.format(portions[0]), ImportWarning)
return loader
def find_loader(self, fullname, path=None):
"""Check whether we can satisfy the import of the module named by
'fullname', or whether it could be a portion of a namespace
package.
"""
info = self._get_extension_module_info(fullname)
if info:
suffix, fullpath = info
fakepath = self.archive + os.sep + fullpath
data = self.get_data(fullpath)
ext = suffix
if ext.startswith(os.sep + '__init__'):
ext = ext[len(os.sep + '__init__'):]
ext = '.' + fullname.split('.')[-1] + ext
return (TemporaryExtensionFileLoader(fullname, fakepath, data, ext),
[])
return super().find_loader(fullname, path)
class TemporaryExtensionFileLoader:
"""An extension file loader which takes a (fake) path and bytearray
of data. The shared object (given in data) is written to a named
temporary file before being loaded.
Based upon `importlib.machinery.ExtensionFileLoader`.
"""
def __init__(self, name, path, data, suffix):
self.data = tempfile.NamedTemporaryFile(suffix=suffix)
self.data.write(data)
self.data.flush()
self.path = path
self.name = name
def load_module(self, fullname):
"""Load an extension module."""
# @_check_name
if fullname is None:
fullname = self.name
elif fullname != self.name:
raise ImportError("loader cannot handle %s" % name, name=name)
is_reload = fullname in sys.modules
try:
module = _call_with_frames_removed(imp.load_dynamic,
fullname, self.data.name)
module.__file__ = self.path # Set this to our fake path!
if self.is_package(fullname) and not hasattr(module, '__path__'):
module.__path__ = [os.path.split(self.path)[0]]
# @set_loader
if not hasattr(module, '__loader__'):
module.__loader__ = self
# @set_package
if getattr(module, '__package__', None) is None:
module.__package__ = module.__name__
if not hasattr(module, '__path__'):
module.__package__ = module.__package__.rpartition('.')[0]
return module
except:
if not is_reload and fullname in sys.modules:
del sys.modules[fullname]
raise
def is_package(self, fullname):
"""Return True if the extension module is a package."""
file_name = os.path.split(self.path)[1]
return any(file_name == '__init__' + suffix
for suffix in EXTENSION_SUFFIXES)
def get_code(self, fullname):
"""Return None as an extension module cannot create a code object."""
return None
def get_source(self, fullname):
"""Return None as extension modules have no source code."""
return None
def install():
"""Replace the zipimport.zipimporter path hook with PydZipImporter."""
idx = sys.path_hooks.index(zipimport.zipimporter)
sys.path_hooks[idx] = PydZipImporter
sys.path_importer_cache.clear()
def uninstall():
"""Restore the original zipimport.zipimporter path hook."""
idx = sys.path_hooks.index(PydZipImporter)
sys.path_hooks[idx] = zipimport.zipimporter
sys.path_importer_cache.clear()