|
1 from __future__ import generators |
|
2 import sys |
|
3 import inspect, tokenize |
|
4 import py |
|
5 from types import ModuleType |
|
6 cpy_compile = compile |
|
7 |
|
8 try: |
|
9 import _ast |
|
10 from _ast import PyCF_ONLY_AST as _AST_FLAG |
|
11 except ImportError: |
|
12 _AST_FLAG = 0 |
|
13 _ast = None |
|
14 |
|
15 |
|
16 class Source(object): |
|
17 """ a immutable object holding a source code fragment, |
|
18 possibly deindenting it. |
|
19 """ |
|
20 _compilecounter = 0 |
|
21 def __init__(self, *parts, **kwargs): |
|
22 self.lines = lines = [] |
|
23 de = kwargs.get('deindent', True) |
|
24 rstrip = kwargs.get('rstrip', True) |
|
25 for part in parts: |
|
26 if not part: |
|
27 partlines = [] |
|
28 if isinstance(part, Source): |
|
29 partlines = part.lines |
|
30 elif isinstance(part, (tuple, list)): |
|
31 partlines = [x.rstrip("\n") for x in part] |
|
32 elif isinstance(part, py.builtin._basestring): |
|
33 partlines = part.split('\n') |
|
34 if rstrip: |
|
35 while partlines: |
|
36 if partlines[-1].strip(): |
|
37 break |
|
38 partlines.pop() |
|
39 else: |
|
40 partlines = getsource(part, deindent=de).lines |
|
41 if de: |
|
42 partlines = deindent(partlines) |
|
43 lines.extend(partlines) |
|
44 |
|
45 def __eq__(self, other): |
|
46 try: |
|
47 return self.lines == other.lines |
|
48 except AttributeError: |
|
49 if isinstance(other, str): |
|
50 return str(self) == other |
|
51 return False |
|
52 |
|
53 def __getitem__(self, key): |
|
54 if isinstance(key, int): |
|
55 return self.lines[key] |
|
56 else: |
|
57 if key.step not in (None, 1): |
|
58 raise IndexError("cannot slice a Source with a step") |
|
59 return self.__getslice__(key.start, key.stop) |
|
60 |
|
61 def __len__(self): |
|
62 return len(self.lines) |
|
63 |
|
64 def __getslice__(self, start, end): |
|
65 newsource = Source() |
|
66 newsource.lines = self.lines[start:end] |
|
67 return newsource |
|
68 |
|
69 def strip(self): |
|
70 """ return new source object with trailing |
|
71 and leading blank lines removed. |
|
72 """ |
|
73 start, end = 0, len(self) |
|
74 while start < end and not self.lines[start].strip(): |
|
75 start += 1 |
|
76 while end > start and not self.lines[end-1].strip(): |
|
77 end -= 1 |
|
78 source = Source() |
|
79 source.lines[:] = self.lines[start:end] |
|
80 return source |
|
81 |
|
82 def putaround(self, before='', after='', indent=' ' * 4): |
|
83 """ return a copy of the source object with |
|
84 'before' and 'after' wrapped around it. |
|
85 """ |
|
86 before = Source(before) |
|
87 after = Source(after) |
|
88 newsource = Source() |
|
89 lines = [ (indent + line) for line in self.lines] |
|
90 newsource.lines = before.lines + lines + after.lines |
|
91 return newsource |
|
92 |
|
93 def indent(self, indent=' ' * 4): |
|
94 """ return a copy of the source object with |
|
95 all lines indented by the given indent-string. |
|
96 """ |
|
97 newsource = Source() |
|
98 newsource.lines = [(indent+line) for line in self.lines] |
|
99 return newsource |
|
100 |
|
101 def getstatement(self, lineno, assertion=False): |
|
102 """ return Source statement which contains the |
|
103 given linenumber (counted from 0). |
|
104 """ |
|
105 start, end = self.getstatementrange(lineno, assertion) |
|
106 return self[start:end] |
|
107 |
|
108 def getstatementrange(self, lineno, assertion=False): |
|
109 """ return (start, end) tuple which spans the minimal |
|
110 statement region which containing the given lineno. |
|
111 """ |
|
112 # XXX there must be a better than these heuristic ways ... |
|
113 # XXX there may even be better heuristics :-) |
|
114 if not (0 <= lineno < len(self)): |
|
115 raise IndexError("lineno out of range") |
|
116 |
|
117 # 1. find the start of the statement |
|
118 from codeop import compile_command |
|
119 for start in range(lineno, -1, -1): |
|
120 if assertion: |
|
121 line = self.lines[start] |
|
122 # the following lines are not fully tested, change with care |
|
123 if 'super' in line and 'self' in line and '__init__' in line: |
|
124 raise IndexError("likely a subclass") |
|
125 if "assert" not in line and "raise" not in line: |
|
126 continue |
|
127 trylines = self.lines[start:lineno+1] |
|
128 # quick hack to indent the source and get it as a string in one go |
|
129 trylines.insert(0, 'def xxx():') |
|
130 trysource = '\n '.join(trylines) |
|
131 # ^ space here |
|
132 try: |
|
133 compile_command(trysource) |
|
134 except (SyntaxError, OverflowError, ValueError): |
|
135 continue |
|
136 |
|
137 # 2. find the end of the statement |
|
138 for end in range(lineno+1, len(self)+1): |
|
139 trysource = self[start:end] |
|
140 if trysource.isparseable(): |
|
141 return start, end |
|
142 return start, end |
|
143 |
|
144 def getblockend(self, lineno): |
|
145 # XXX |
|
146 lines = [x + '\n' for x in self.lines[lineno:]] |
|
147 blocklines = inspect.getblock(lines) |
|
148 #print blocklines |
|
149 return lineno + len(blocklines) - 1 |
|
150 |
|
151 def deindent(self, offset=None): |
|
152 """ return a new source object deindented by offset. |
|
153 If offset is None then guess an indentation offset from |
|
154 the first non-blank line. Subsequent lines which have a |
|
155 lower indentation offset will be copied verbatim as |
|
156 they are assumed to be part of multilines. |
|
157 """ |
|
158 # XXX maybe use the tokenizer to properly handle multiline |
|
159 # strings etc.pp? |
|
160 newsource = Source() |
|
161 newsource.lines[:] = deindent(self.lines, offset) |
|
162 return newsource |
|
163 |
|
164 def isparseable(self, deindent=True): |
|
165 """ return True if source is parseable, heuristically |
|
166 deindenting it by default. |
|
167 """ |
|
168 try: |
|
169 import parser |
|
170 except ImportError: |
|
171 syntax_checker = lambda x: compile(x, 'asd', 'exec') |
|
172 else: |
|
173 syntax_checker = parser.suite |
|
174 |
|
175 if deindent: |
|
176 source = str(self.deindent()) |
|
177 else: |
|
178 source = str(self) |
|
179 try: |
|
180 #compile(source+'\n', "x", "exec") |
|
181 syntax_checker(source+'\n') |
|
182 except KeyboardInterrupt: |
|
183 raise |
|
184 except Exception: |
|
185 return False |
|
186 else: |
|
187 return True |
|
188 |
|
189 def __str__(self): |
|
190 return "\n".join(self.lines) |
|
191 |
|
192 def compile(self, filename=None, mode='exec', |
|
193 flag=generators.compiler_flag, |
|
194 dont_inherit=0, _genframe=None): |
|
195 """ return compiled code object. if filename is None |
|
196 invent an artificial filename which displays |
|
197 the source/line position of the caller frame. |
|
198 """ |
|
199 if not filename or py.path.local(filename).check(file=0): |
|
200 if _genframe is None: |
|
201 _genframe = sys._getframe(1) # the caller |
|
202 fn,lineno = _genframe.f_code.co_filename, _genframe.f_lineno |
|
203 base = "<%d-codegen " % self._compilecounter |
|
204 self.__class__._compilecounter += 1 |
|
205 if not filename: |
|
206 filename = base + '%s:%d>' % (fn, lineno) |
|
207 else: |
|
208 filename = base + '%r %s:%d>' % (filename, fn, lineno) |
|
209 source = "\n".join(self.lines) + '\n' |
|
210 try: |
|
211 co = cpy_compile(source, filename, mode, flag) |
|
212 except SyntaxError: |
|
213 ex = sys.exc_info()[1] |
|
214 # re-represent syntax errors from parsing python strings |
|
215 msglines = self.lines[:ex.lineno] |
|
216 if ex.offset: |
|
217 msglines.append(" "*ex.offset + '^') |
|
218 msglines.append("syntax error probably generated here: %s" % filename) |
|
219 newex = SyntaxError('\n'.join(msglines)) |
|
220 newex.offset = ex.offset |
|
221 newex.lineno = ex.lineno |
|
222 newex.text = ex.text |
|
223 raise newex |
|
224 else: |
|
225 if flag & _AST_FLAG: |
|
226 return co |
|
227 lines = [(x + "\n") for x in self.lines] |
|
228 if sys.version_info[0] >= 3: |
|
229 # XXX py3's inspect.getsourcefile() checks for a module |
|
230 # and a pep302 __loader__ ... we don't have a module |
|
231 # at code compile-time so we need to fake it here |
|
232 m = ModuleType("_pycodecompile_pseudo_module") |
|
233 py.std.inspect.modulesbyfile[filename] = None |
|
234 py.std.sys.modules[None] = m |
|
235 m.__loader__ = 1 |
|
236 py.std.linecache.cache[filename] = (1, None, lines, filename) |
|
237 return co |
|
238 |
|
239 # |
|
240 # public API shortcut functions |
|
241 # |
|
242 |
|
243 def compile_(source, filename=None, mode='exec', flags= |
|
244 generators.compiler_flag, dont_inherit=0): |
|
245 """ compile the given source to a raw code object, |
|
246 and maintain an internal cache which allows later |
|
247 retrieval of the source code for the code object |
|
248 and any recursively created code objects. |
|
249 """ |
|
250 if _ast is not None and isinstance(source, _ast.AST): |
|
251 # XXX should Source support having AST? |
|
252 return cpy_compile(source, filename, mode, flags, dont_inherit) |
|
253 _genframe = sys._getframe(1) # the caller |
|
254 s = Source(source) |
|
255 co = s.compile(filename, mode, flags, _genframe=_genframe) |
|
256 return co |
|
257 |
|
258 |
|
259 def getfslineno(obj): |
|
260 try: |
|
261 code = py.code.Code(obj) |
|
262 except TypeError: |
|
263 # fallback to |
|
264 fn = (py.std.inspect.getsourcefile(obj) or |
|
265 py.std.inspect.getfile(obj)) |
|
266 fspath = fn and py.path.local(fn) or None |
|
267 if fspath: |
|
268 try: |
|
269 _, lineno = findsource(obj) |
|
270 except IOError: |
|
271 lineno = None |
|
272 else: |
|
273 lineno = None |
|
274 else: |
|
275 fspath = code.path |
|
276 lineno = code.firstlineno |
|
277 return fspath, lineno |
|
278 |
|
279 # |
|
280 # helper functions |
|
281 # |
|
282 |
|
283 def findsource(obj): |
|
284 try: |
|
285 sourcelines, lineno = py.std.inspect.findsource(obj) |
|
286 except py.builtin._sysex: |
|
287 raise |
|
288 except: |
|
289 return None, None |
|
290 source = Source() |
|
291 source.lines = [line.rstrip() for line in sourcelines] |
|
292 return source, lineno |
|
293 |
|
294 def getsource(obj, **kwargs): |
|
295 obj = py.code.getrawcode(obj) |
|
296 try: |
|
297 strsrc = inspect.getsource(obj) |
|
298 except IndentationError: |
|
299 strsrc = "\"Buggy python version consider upgrading, cannot get source\"" |
|
300 assert isinstance(strsrc, str) |
|
301 return Source(strsrc, **kwargs) |
|
302 |
|
303 def deindent(lines, offset=None): |
|
304 if offset is None: |
|
305 for line in lines: |
|
306 line = line.expandtabs() |
|
307 s = line.lstrip() |
|
308 if s: |
|
309 offset = len(line)-len(s) |
|
310 break |
|
311 else: |
|
312 offset = 0 |
|
313 if offset == 0: |
|
314 return list(lines) |
|
315 newlines = [] |
|
316 def readline_generator(lines): |
|
317 for line in lines: |
|
318 yield line + '\n' |
|
319 while True: |
|
320 yield '' |
|
321 |
|
322 r = readline_generator(lines) |
|
323 try: |
|
324 readline = r.next |
|
325 except AttributeError: |
|
326 readline = r.__next__ |
|
327 |
|
328 try: |
|
329 for _, _, (sline, _), (eline, _), _ in tokenize.generate_tokens(readline): |
|
330 if sline > len(lines): |
|
331 break # End of input reached |
|
332 if sline > len(newlines): |
|
333 line = lines[sline - 1].expandtabs() |
|
334 if line.lstrip() and line[:offset].isspace(): |
|
335 line = line[offset:] # Deindent |
|
336 newlines.append(line) |
|
337 |
|
338 for i in range(sline, eline): |
|
339 # Don't deindent continuing lines of |
|
340 # multiline tokens (i.e. multiline strings) |
|
341 newlines.append(lines[i]) |
|
342 except (IndentationError, tokenize.TokenError): |
|
343 pass |
|
344 # Add any lines we didn't see. E.g. if an exception was raised. |
|
345 newlines.extend(lines[len(newlines):]) |
|
346 return newlines |