eggs/py-1.4.0-py2.6.egg/py/_io/capture.py
changeset 307 c6bca38c1cbf
equal deleted inserted replaced
306:5ff1fc726848 307:c6bca38c1cbf
       
     1 import os
       
     2 import sys
       
     3 import py
       
     4 import tempfile
       
     5 
       
     6 try:
       
     7     from io import StringIO
       
     8 except ImportError:
       
     9     from StringIO import StringIO
       
    10 
       
    11 if sys.version_info < (3,0):
       
    12     class TextIO(StringIO):
       
    13         def write(self, data):
       
    14             if not isinstance(data, unicode):
       
    15                 data = unicode(data, getattr(self, '_encoding', 'UTF-8'))
       
    16             StringIO.write(self, data)
       
    17 else:
       
    18     TextIO = StringIO
       
    19 
       
    20 try:
       
    21     from io import BytesIO
       
    22 except ImportError:
       
    23     class BytesIO(StringIO):
       
    24         def write(self, data):
       
    25             if isinstance(data, unicode):
       
    26                 raise TypeError("not a byte value: %r" %(data,))
       
    27             StringIO.write(self, data)
       
    28 
       
    29 patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
       
    30 
       
    31 class FDCapture:
       
    32     """ Capture IO to/from a given os-level filedescriptor. """
       
    33 
       
    34     def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
       
    35         """ save targetfd descriptor, and open a new
       
    36             temporary file there.  If no tmpfile is
       
    37             specified a tempfile.Tempfile() will be opened
       
    38             in text mode.
       
    39         """
       
    40         self.targetfd = targetfd
       
    41         if tmpfile is None and targetfd != 0:
       
    42             f = tempfile.TemporaryFile('wb+')
       
    43             tmpfile = dupfile(f, encoding="UTF-8")
       
    44             f.close()
       
    45         self.tmpfile = tmpfile
       
    46         self._savefd = os.dup(self.targetfd)
       
    47         if patchsys:
       
    48             self._oldsys = getattr(sys, patchsysdict[targetfd])
       
    49         if now:
       
    50             self.start()
       
    51 
       
    52     def start(self):
       
    53         try:
       
    54             os.fstat(self._savefd)
       
    55         except OSError:
       
    56             raise ValueError("saved filedescriptor not valid, "
       
    57                 "did you call start() twice?")
       
    58         if self.targetfd == 0 and not self.tmpfile:
       
    59             fd = os.open(devnullpath, os.O_RDONLY)
       
    60             os.dup2(fd, 0)
       
    61             os.close(fd)
       
    62             if hasattr(self, '_oldsys'):
       
    63                 setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
       
    64         else:
       
    65             os.dup2(self.tmpfile.fileno(), self.targetfd)
       
    66             if hasattr(self, '_oldsys'):
       
    67                 setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
       
    68 
       
    69     def done(self):
       
    70         """ unpatch and clean up, returns the self.tmpfile (file object)
       
    71         """
       
    72         os.dup2(self._savefd, self.targetfd)
       
    73         os.close(self._savefd)
       
    74         if self.targetfd != 0:
       
    75             self.tmpfile.seek(0)
       
    76         if hasattr(self, '_oldsys'):
       
    77             setattr(sys, patchsysdict[self.targetfd], self._oldsys)
       
    78         return self.tmpfile
       
    79 
       
    80     def writeorg(self, data):
       
    81         """ write a string to the original file descriptor
       
    82         """
       
    83         tempfp = tempfile.TemporaryFile()
       
    84         try:
       
    85             os.dup2(self._savefd, tempfp.fileno())
       
    86             tempfp.write(data)
       
    87         finally:
       
    88             tempfp.close()
       
    89 
       
    90 
       
    91 def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
       
    92     """ return a new open file object that's a duplicate of f
       
    93 
       
    94         mode is duplicated if not given, 'buffering' controls
       
    95         buffer size (defaulting to no buffering) and 'raising'
       
    96         defines whether an exception is raised when an incompatible
       
    97         file object is passed in (if raising is False, the file
       
    98         object itself will be returned)
       
    99     """
       
   100     try:
       
   101         fd = f.fileno()
       
   102     except AttributeError:
       
   103         if raising:
       
   104             raise
       
   105         return f
       
   106     newfd = os.dup(fd)
       
   107     mode = mode and mode or f.mode
       
   108     if sys.version_info >= (3,0):
       
   109         if encoding is not None:
       
   110             mode = mode.replace("b", "")
       
   111             buffering = True
       
   112         return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
       
   113     else:
       
   114         f = os.fdopen(newfd, mode, buffering)
       
   115         if encoding is not None:
       
   116             return EncodedFile(f, encoding)
       
   117         return f
       
   118 
       
   119 class EncodedFile(object):
       
   120     def __init__(self, _stream, encoding):
       
   121         self._stream = _stream
       
   122         self.encoding = encoding
       
   123 
       
   124     def write(self, obj):
       
   125         if isinstance(obj, unicode):
       
   126             obj = obj.encode(self.encoding)
       
   127         elif isinstance(obj, str):
       
   128             pass
       
   129         else:
       
   130             obj = str(obj)
       
   131         self._stream.write(obj)
       
   132 
       
   133     def writelines(self, linelist):
       
   134         data = ''.join(linelist)
       
   135         self.write(data)
       
   136 
       
   137     def __getattr__(self, name):
       
   138         return getattr(self._stream, name)
       
   139 
       
   140 class Capture(object):
       
   141     def call(cls, func, *args, **kwargs):
       
   142         """ return a (res, out, err) tuple where
       
   143             out and err represent the output/error output
       
   144             during function execution.
       
   145             call the given function with args/kwargs
       
   146             and capture output/error during its execution.
       
   147         """
       
   148         so = cls()
       
   149         try:
       
   150             res = func(*args, **kwargs)
       
   151         finally:
       
   152             out, err = so.reset()
       
   153         return res, out, err
       
   154     call = classmethod(call)
       
   155 
       
   156     def reset(self):
       
   157         """ reset sys.stdout/stderr and return captured output as strings. """
       
   158         if hasattr(self, '_reset'):
       
   159             raise ValueError("was already reset")
       
   160         self._reset = True
       
   161         outfile, errfile = self.done(save=False)
       
   162         out, err = "", ""
       
   163         if outfile and not outfile.closed:
       
   164             out = outfile.read()
       
   165             outfile.close()
       
   166         if errfile and errfile != outfile and not errfile.closed:
       
   167             err = errfile.read()
       
   168             errfile.close()
       
   169         return out, err
       
   170 
       
   171     def suspend(self):
       
   172         """ return current snapshot captures, memorize tempfiles. """
       
   173         outerr = self.readouterr()
       
   174         outfile, errfile = self.done()
       
   175         return outerr
       
   176 
       
   177 
       
   178 class StdCaptureFD(Capture):
       
   179     """ This class allows to capture writes to FD1 and FD2
       
   180         and may connect a NULL file to FD0 (and prevent
       
   181         reads from sys.stdin).  If any of the 0,1,2 file descriptors
       
   182         is invalid it will not be captured.
       
   183     """
       
   184     def __init__(self, out=True, err=True, mixed=False,
       
   185         in_=True, patchsys=True, now=True):
       
   186         self._options = locals()
       
   187         self._save()
       
   188         if now:
       
   189             self.startall()
       
   190 
       
   191     def _save(self):
       
   192         in_ = self._options['in_']
       
   193         out = self._options['out']
       
   194         err = self._options['err']
       
   195         mixed = self._options['mixed']
       
   196         patchsys = self._options['patchsys']
       
   197         if in_:
       
   198             try:
       
   199                 self.in_ = FDCapture(0, tmpfile=None, now=False,
       
   200                     patchsys=patchsys)
       
   201             except OSError:
       
   202                 pass
       
   203         if out:
       
   204             tmpfile = None
       
   205             if hasattr(out, 'write'):
       
   206                 tmpfile = out
       
   207             try:
       
   208                 self.out = FDCapture(1, tmpfile=tmpfile,
       
   209                            now=False, patchsys=patchsys)
       
   210                 self._options['out'] = self.out.tmpfile
       
   211             except OSError:
       
   212                 pass
       
   213         if err:
       
   214             if out and mixed:
       
   215                 tmpfile = self.out.tmpfile
       
   216             elif hasattr(err, 'write'):
       
   217                 tmpfile = err
       
   218             else:
       
   219                 tmpfile = None
       
   220             try:
       
   221                 self.err = FDCapture(2, tmpfile=tmpfile,
       
   222                            now=False, patchsys=patchsys)
       
   223                 self._options['err'] = self.err.tmpfile
       
   224             except OSError:
       
   225                 pass
       
   226 
       
   227     def startall(self):
       
   228         if hasattr(self, 'in_'):
       
   229             self.in_.start()
       
   230         if hasattr(self, 'out'):
       
   231             self.out.start()
       
   232         if hasattr(self, 'err'):
       
   233             self.err.start()
       
   234 
       
   235     def resume(self):
       
   236         """ resume capturing with original temp files. """
       
   237         self.startall()
       
   238 
       
   239     def done(self, save=True):
       
   240         """ return (outfile, errfile) and stop capturing. """
       
   241         outfile = errfile = None
       
   242         if hasattr(self, 'out') and not self.out.tmpfile.closed:
       
   243             outfile = self.out.done()
       
   244         if hasattr(self, 'err') and not self.err.tmpfile.closed:
       
   245             errfile = self.err.done()
       
   246         if hasattr(self, 'in_'):
       
   247             tmpfile = self.in_.done()
       
   248         if save:
       
   249             self._save()
       
   250         return outfile, errfile
       
   251 
       
   252     def readouterr(self):
       
   253         """ return snapshot value of stdout/stderr capturings. """
       
   254         l = []
       
   255         for name in ('out', 'err'):
       
   256             res = ""
       
   257             if hasattr(self, name):
       
   258                 f = getattr(self, name).tmpfile
       
   259                 f.seek(0)
       
   260                 res = f.read()
       
   261                 f.truncate(0)
       
   262                 f.seek(0)
       
   263             l.append(res)
       
   264         return l
       
   265 
       
   266 class StdCapture(Capture):
       
   267     """ This class allows to capture writes to sys.stdout|stderr "in-memory"
       
   268         and will raise errors on tries to read from sys.stdin. It only
       
   269         modifies sys.stdout|stderr|stdin attributes and does not
       
   270         touch underlying File Descriptors (use StdCaptureFD for that).
       
   271     """
       
   272     def __init__(self, out=True, err=True, in_=True, mixed=False, now=True):
       
   273         self._oldout = sys.stdout
       
   274         self._olderr = sys.stderr
       
   275         self._oldin  = sys.stdin
       
   276         if out and not hasattr(out, 'file'):
       
   277             out = TextIO()
       
   278         self.out = out
       
   279         if err:
       
   280             if mixed:
       
   281                 err = out
       
   282             elif not hasattr(err, 'write'):
       
   283                 err = TextIO()
       
   284         self.err = err
       
   285         self.in_ = in_
       
   286         if now:
       
   287             self.startall()
       
   288 
       
   289     def startall(self):
       
   290         if self.out:
       
   291             sys.stdout = self.out
       
   292         if self.err:
       
   293             sys.stderr = self.err
       
   294         if self.in_:
       
   295             sys.stdin  = self.in_  = DontReadFromInput()
       
   296 
       
   297     def done(self, save=True):
       
   298         """ return (outfile, errfile) and stop capturing. """
       
   299         outfile = errfile = None
       
   300         if self.out and not self.out.closed:
       
   301             sys.stdout = self._oldout
       
   302             outfile = self.out
       
   303             outfile.seek(0)
       
   304         if self.err and not self.err.closed:
       
   305             sys.stderr = self._olderr
       
   306             errfile = self.err
       
   307             errfile.seek(0)
       
   308         if self.in_:
       
   309             sys.stdin = self._oldin
       
   310         return outfile, errfile
       
   311 
       
   312     def resume(self):
       
   313         """ resume capturing with original temp files. """
       
   314         self.startall()
       
   315 
       
   316     def readouterr(self):
       
   317         """ return snapshot value of stdout/stderr capturings. """
       
   318         out = err = ""
       
   319         if self.out:
       
   320             out = self.out.getvalue()
       
   321             self.out.truncate(0)
       
   322             self.out.seek(0)
       
   323         if self.err:
       
   324             err = self.err.getvalue()
       
   325             self.err.truncate(0)
       
   326             self.err.seek(0)
       
   327         return out, err
       
   328 
       
   329 class DontReadFromInput:
       
   330     """Temporary stub class.  Ideally when stdin is accessed, the
       
   331     capturing should be turned off, with possibly all data captured
       
   332     so far sent to the screen.  This should be configurable, though,
       
   333     because in automated test runs it is better to crash than
       
   334     hang indefinitely.
       
   335     """
       
   336     def read(self, *args):
       
   337         raise IOError("reading from stdin while output is captured")
       
   338     readline = read
       
   339     readlines = read
       
   340     __iter__ = read
       
   341 
       
   342     def fileno(self):
       
   343         raise ValueError("redirected Stdin is pseudofile, has no fileno()")
       
   344     def isatty(self):
       
   345         return False
       
   346     def close(self):
       
   347         pass
       
   348 
       
   349 try:
       
   350     devnullpath = os.devnull
       
   351 except AttributeError:
       
   352     if os.name == 'nt':
       
   353         devnullpath = 'NUL'
       
   354     else:
       
   355         devnullpath = '/dev/null'