Source code for gnes.preprocessor.io_utils.helper

import subprocess as sp


[docs]def kwargs_to_cmd_args(kwargs): args = [] for k, v in kwargs.items(): args.append('-%s' % k) if v is not None: args.append('%s' % str(v)) return args
def _check_input(input_fn: str, input_data: bytes): capture_stdin = (input_fn == 'pipe:') if capture_stdin and input_data is None: raise ValueError( "the buffered video data for stdin should not be empty")
[docs]def run_command_async(cmd_args, pipe_stdin=True, pipe_stdout=False, pipe_stderr=False, quiet=False): stdin_stream = sp.PIPE if pipe_stdin else None stdout_stream = sp.PIPE if pipe_stdout or quiet else None stderr_stream = sp.PIPE if pipe_stderr or quiet else None return sp.Popen( cmd_args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream, close_fds=True)
[docs]def wait(process): while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: print(output.strip()) rc = process.poll() return (output, rc)
[docs]def run_command(cmd_args, input=None, pipe_stdin=True, pipe_stdout=False, pipe_stderr=False, quiet=False): with run_command_async( cmd_args, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr, quiet=quiet) as proc: stdout, stderr = proc.communicate(input) retcode = proc.poll() if retcode: raise Exception('ffmpeg error: %s' % stderr) if proc.stdout is not None: proc.stdout.close() if proc.stderr is not None: proc.stderr.close() return stdout, stderr