     1 import os
     2 import sys
     3 import py
     4 import tempfile
     6 try:
     7     from io import StringIO
     8 except ImportError:
     9     from StringIO import StringIO
    11 if sys.version_info < (3,0):
    12     class TextIO(StringIO):
    13         def write(self, data):
    14             if not isinstance(data, unicode):
    15                 data = unicode(data, getattr(self, '_encoding', 'UTF-8'))
    16             StringIO.write(self, data)
    17 else:
    18     TextIO = StringIO
    20 try:
    21     from io import BytesIO
    22 except ImportError:
    23     class BytesIO(StringIO):
    24         def write(self, data):
    25             if isinstance(data, unicode):
    26                 raise TypeError("not a byte value: %r" %(data,))
    27             StringIO.write(self, data)
    29 patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
    31 class FDCapture:
    32     """ Capture IO to/from a given os-level filedescriptor. """
    34     def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
    35         """ save targetfd descriptor, and open a new
    36             temporary file there.  If no tmpfile is
    37             specified a tempfile.Tempfile() will be opened
    38             in text mode.
    39         """
    40         self.targetfd = targetfd
    41         if tmpfile is None and targetfd != 0:
    42             f = tempfile.TemporaryFile('wb+')
    43             tmpfile = dupfile(f, encoding="UTF-8")
    44             f.close()
    45         self.tmpfile = tmpfile
    46         self._savefd = os.dup(self.targetfd)
    47         if patchsys:
    48             self._oldsys = getattr(sys, patchsysdict[targetfd])
    49         if now:
    50             self.start()
    52     def start(self):
    53         try:
    54             os.fstat(self._savefd)
    55         except OSError:
    56             raise ValueError("saved filedescriptor not valid, "
    57                 "did you call start() twice?")
    58         if self.targetfd == 0 and not self.tmpfile:
    59             fd =, os.O_RDONLY)
    60             os.dup2(fd, 0)
    61             os.close(fd)
    62             if hasattr(self, '_oldsys'):
    63                 setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
    64         else:
    65             os.dup2(self.tmpfile.fileno(), self.targetfd)
    66             if hasattr(self, '_oldsys'):
    67                 setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
    69     def done(self):
    70         """ unpatch and clean up, returns the self.tmpfile (file object)
    71         """
    72         os.dup2(self._savefd, self.targetfd)
    73         os.close(self._savefd)
    74         if self.targetfd != 0:
    76         if hasattr(self, '_oldsys'):
    77             setattr(sys, patchsysdict[self.targetfd], self._oldsys)
    78         return self.tmpfile
    80     def writeorg(self, data):
    81         """ write a string to the original file descriptor
    82         """
    83         tempfp = tempfile.TemporaryFile()
    84         try:
    85             os.dup2(self._savefd, tempfp.fileno())
    86             tempfp.write(data)
    87         finally:
    88             tempfp.close()
    91 def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
    92     """ return a new open file object that's a duplicate of f
    94         mode is duplicated if not given, 'buffering' controls
    95         buffer size (defaulting to no buffering) and 'raising'
    96         defines whether an exception is raised when an incompatible
    97         file object is passed in (if raising is False, the file
    98         object itself will be returned)
    99     """
   100     try:
   101         fd = f.fileno()
   102     except AttributeError:
   103         if raising:
   104             raise
   105         return f
   106     newfd = os.dup(fd)
   107     mode = mode and mode or f.mode
   108     if sys.version_info >= (3,0):
   109         if encoding is not None:
   110             mode = mode.replace("b", "")
   111             buffering = True
   112         return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
   113     else:
   114         f = os.fdopen(newfd, mode, buffering)
   115         if encoding is not None:
   116             return EncodedFile(f, encoding)
   117         return f
   119 class EncodedFile(object):
   120     def __init__(self, _stream, encoding):
   121         self._stream = _stream
   122         self.encoding = encoding
   124     def write(self, obj):
   125         if isinstance(obj, unicode):
   126             obj = obj.encode(self.encoding)
   127         elif isinstance(obj, str):
   128             pass
   129         else:
   130             obj = str(obj)
   131         self._stream.write(obj)
   133     def writelines(self, linelist):
   134         data = ''.join(linelist)
   135         self.write(data)
   137     def __getattr__(self, name):
   138         return getattr(self._stream, name)
   140 class Capture(object):
   141     def call(cls, func, *args, **kwargs):
   142         """ return a (res, out, err) tuple where
   143             out and err represent the output/error output
   144             during function execution.
   145             call the given function with args/kwargs
   146             and capture output/error during its execution.
   147         """
   148         so = cls()
   149         try:
   150             res = func(*args, **kwargs)
   151         finally:
   152             out, err = so.reset()
   153         return res, out, err
   154     call = classmethod(call)
   156     def reset(self):
   157         """ reset sys.stdout/stderr and return captured output as strings. """
   158         if hasattr(self, '_reset'):
   159             raise ValueError("was already reset")
   160         self._reset = True
   161         outfile, errfile = self.done(save=False)
   162         out, err = "", ""
   163         if outfile and not outfile.closed:
   164             out =
   165             outfile.close()
   166         if errfile and errfile != outfile and not errfile.closed:
   167             err =
   168             errfile.close()
   169         return out, err
   171     def suspend(self):
   172         """ return current snapshot captures, memorize tempfiles. """
   173         outerr = self.readouterr()
   174         outfile, errfile = self.done()
   175         return outerr
   178 class StdCaptureFD(Capture):
   179     """ This class allows to capture writes to FD1 and FD2
   180         and may connect a NULL file to FD0 (and prevent
   181         reads from sys.stdin).  If any of the 0,1,2 file descriptors
   182         is invalid it will not be captured.
   183     """
   184     def __init__(self, out=True, err=True, mixed=False,
   185         in_=True, patchsys=True, now=True):
   186         self._options = locals()
   187         self._save()
   188         if now:
   189             self.startall()
   191     def _save(self):
   192         in_ = self._options['in_']
   193         out = self._options['out']
   194         err = self._options['err']
   195         mixed = self._options['mixed']
   196         patchsys = self._options['patchsys']
   197         if in_:
   198             try:
   199                 self.in_ = FDCapture(0, tmpfile=None, now=False,
   200                     patchsys=patchsys)
   201             except OSError:
   202                 pass
   203         if out:
   204             tmpfile = None
   205             if hasattr(out, 'write'):
   206                 tmpfile = out
   207             try:
   208                 self.out = FDCapture(1, tmpfile=tmpfile,
   209                            now=False, patchsys=patchsys)
   210                 self._options['out'] = self.out.tmpfile
   211             except OSError:
   212                 pass
   213         if err:
   214             if out and mixed:
   215                 tmpfile = self.out.tmpfile
   216             elif hasattr(err, 'write'):
   217                 tmpfile = err
   218             else:
   219                 tmpfile = None
   220             try:
   221                 self.err = FDCapture(2, tmpfile=tmpfile,
   222                            now=False, patchsys=patchsys)
   223                 self._options['err'] = self.err.tmpfile
   224             except OSError:
   225                 pass
   227     def startall(self):
   228         if hasattr(self, 'in_'):
   229             self.in_.start()
   230         if hasattr(self, 'out'):
   231             self.out.start()
   232         if hasattr(self, 'err'):
   233             self.err.start()
   235     def resume(self):
   236         """ resume capturing with original temp files. """
   237         self.startall()
   239     def done(self, save=True):
   240         """ return (outfile, errfile) and stop capturing. """
   241         outfile = errfile = None
   242         if hasattr(self, 'out') and not self.out.tmpfile.closed:
   243             outfile = self.out.done()
   244         if hasattr(self, 'err') and not self.err.tmpfile.closed:
   245             errfile = self.err.done()
   246         if hasattr(self, 'in_'):
   247             tmpfile = self.in_.done()
   248         if save:
   249             self._save()
   250         return outfile, errfile
   252     def readouterr(self):
   253         """ return snapshot value of stdout/stderr capturings. """
   254         l = []
   255         for name in ('out', 'err'):
   256             res = ""
   257             if hasattr(self, name):
   258                 f = getattr(self, name).tmpfile
   260                 res =
   261                 f.truncate(0)
   263             l.append(res)
   264         return l
   266 class StdCapture(Capture):
   267     """ This class allows to capture writes to sys.stdout|stderr "in-memory"
   268         and will raise errors on tries to read from sys.stdin. It only
   269         modifies sys.stdout|stderr|stdin attributes and does not
   270         touch underlying File Descriptors (use StdCaptureFD for that).
   271     """
   272     def __init__(self, out=True, err=True, in_=True, mixed=False, now=True):
   273         self._oldout = sys.stdout
   274         self._olderr = sys.stderr
   275         self._oldin  = sys.stdin
   276         if out and not hasattr(out, 'file'):
   277             out = TextIO()
   278         self.out = out
   279         if err:
   280             if mixed:
   281                 err = out
   282             elif not hasattr(err, 'write'):
   283                 err = TextIO()
   284         self.err = err
   285         self.in_ = in_
   286         if now:
   287             self.startall()
   289     def startall(self):
   290         if self.out:
   291             sys.stdout = self.out
   292         if self.err:
   293             sys.stderr = self.err
   294         if self.in_:
   295             sys.stdin  = self.in_  = DontReadFromInput()
   297     def done(self, save=True):
   298         """ return (outfile, errfile) and stop capturing. """
   299         outfile = errfile = None
   300         if self.out and not self.out.closed:
   301             sys.stdout = self._oldout
   302             outfile = self.out
   304         if self.err and not self.err.closed:
   305             sys.stderr = self._olderr
   306             errfile = self.err
   308         if self.in_:
   309             sys.stdin = self._oldin
   310         return outfile, errfile
   312     def resume(self):
   313         """ resume capturing with original temp files. """
   314         self.startall()
   316     def readouterr(self):
   317         """ return snapshot value of stdout/stderr capturings. """
   318         out = err = ""
   319         if self.out:
   320             out = self.out.getvalue()
   321             self.out.truncate(0)
   323         if self.err:
   324             err = self.err.getvalue()
   325             self.err.truncate(0)
   327         return out, err
   329 class DontReadFromInput:
   330     """Temporary stub class.  Ideally when stdin is accessed, the
   331     capturing should be turned off, with possibly all data captured
   332     so far sent to the screen.  This should be configurable, though,
   333     because in automated test runs it is better to crash than
   334     hang indefinitely.
   335     """
   336     def read(self, *args):
   337         raise IOError("reading from stdin while output is captured")
   338     readline = read
   339     readlines = read
   340     __iter__ = read
   342     def fileno(self):
   343         raise ValueError("redirected Stdin is pseudofile, has no fileno()")
   344     def isatty(self):
   345         return False
   346     def close(self):
   347         pass
   349 try:
   350     devnullpath = os.devnull
   351 except AttributeError:
   352     if == 'nt':
   353         devnullpath = 'NUL'
   354     else:
   355         devnullpath = '/dev/null'