하위 프로세스에서 비 블로킹 읽기 파이썬의 PIPE
하위 프로세스 모듈 을 사용하여 하위 프로세스 를 시작하고 출력 스트림 (stdout)에 연결합니다. stdout에서 비 블로킹 읽기를 실행할 수 있기를 원합니다. .readline을 차단하지 않거나 호출하기 전에 스트림에 데이터가 있는지 확인하는 방법이 .readline
있습니까? 이식성이 좋거나 Windows 및 Linux에서 작동하고 싶습니다.
여기에 내가 지금하는 방법이 있습니다 (사용 가능한 .readline
데이터가없는 경우 차단됩니다 ).
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
fcntl
, select
, asyncproc
이 경우 도움이되지 않습니다.
운영 체제와 상관없이 차단하지 않고 스트림을 읽는 안정적인 방법은 다음을 사용하는 것입니다 Queue.get_nowait()
.
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
나는 종종 비슷한 문제를 겪었다. 필자가 자주 쓰는 Python 프로그램은 명령 줄 (stdin)에서 사용자 입력을받는 동시에 일부 기본 기능을 실행할 수 있어야합니다. 사용자 입력 처리 기능을 다른 스레드에 넣는 것만으로는 문제가 해결 readline()
되지 않으며 시간 제한이 없기 때문 입니다. 기본 기능이 완료되고 더 이상 사용자 입력을 기다릴 필요가 없으면 일반적으로 프로그램을 종료하고 싶지만 readline()
다른 스레드에서 여전히 라인을 기다리는 중이므로 차단할 수 없습니다 . 이 문제에 대한 해결책은 fcntl 모듈을 사용하여 stdin을 비 차단 파일로 만드는 것입니다.
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
내 의견으로는 이것은이 문제를 해결하기 위해 선택 또는 신호 모듈을 사용하는 것보다 조금 더 깨끗하지만 다시 UNIX에서만 작동합니다 ...
파이썬 3.4는 비동기 IO 모듈을 위한 새로운 임시 API 를 도입했습니다 .asyncio
이 접근 방식은 twisted
@Bryan Ward의 답변 과 유사 합니다. 프로토콜을 정의하면 데이터가 준비되는 즉시 해당 메소드가 호출됩니다.
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
docs의 "Subprocess"를 참조하십시오 .
코 루틴 ( / Python 3.5+ 구문 사용 )을 사용하여 비동기식으로 행을 읽을 수있는 객체asyncio.create_subprocess_exec()
를 반환하는 고급 인터페이스 가 있습니다 .Process
StreamReader.readline()
async
await
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
다음 작업을 수행합니다.
- 하위 프로세스를 시작하고 stdout을 파이프로 리디렉션
- 서브 프로세스의 stdout에서 행을 비동기 적으로 읽습니다.
- 하위 프로세스 종료
- 종료 될 때까지 기다리십시오
필요한 경우 각 단계는 시간 초과로 제한 될 수 있습니다.
asyncproc 모듈을 사용해보십시오 . 예를 들면 다음과 같습니다.
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
모듈은 S.Lott에서 제안한대로 모든 스레딩을 처리합니다.
Twisted 에서이 작업을 쉽게 수행 할 수 있습니다 . 기존 코드 기반에 따라 사용하기 쉽지 않을 수도 있지만, 트위스트 된 응용 프로그램을 구축하는 경우 이와 같은 것이 거의 사소 해집니다. ProcessProtocol
클래스를 만들고 outReceived()
메서드를 재정의합니다 . 트위스트 (사용 된 리액터에 따라 다름)는 일반적으로 select()
다른 파일 디스크립터 (종종 네트워크 소켓)의 데이터를 처리하기 위해 콜백이 설치된 큰 루프입니다. 따라서이 outReceived()
방법은에서 오는 데이터를 처리하기위한 콜백을 설치하는 것입니다 STDOUT
. 이 동작을 보여주는 간단한 예는 다음과 같습니다.
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
트위스트 문서는 이에 대한 좋은 정보가 있습니다.
Twisted를 중심으로 전체 응용 프로그램을 빌드하면 로컬 또는 원격의 다른 프로세스와 비동기식으로 통신 할 수 있습니다. 반면에, 프로그램이 Twisted를 기반으로 구축되지 않은 경우 실제로 그렇게 도움이되지는 않습니다. 이 방법이 특정 응용 프로그램에 적용되지 않더라도 다른 독자에게 도움이 되길 바랍니다.
select & read (1)를 사용하십시오.
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
readline ()과 같은 경우 :
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
한 가지 해결책은 프로세스를 읽도록 다른 프로세스를 만들거나 시간 초과로 프로세스 스레드를 만드는 것입니다.
다음은 타임 아웃 함수의 스레드 버전입니다.
http://code.activestate.com/recipes/473878/
그러나 들어올 때 stdout을 읽어야합니까? 다른 해결책은 출력을 파일로 덤프하고 p.wait () 사용하여 프로세스가 완료 될 때까지 기다리는 것입니다 .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
면책 조항 : 이것은 토네이도에만 작동합니다
fd를 비 블로킹으로 설정 한 다음 ioloop를 사용하여 콜백을 등록하면됩니다. tornado_subprocess 라는 달걀에 이것을 패키지로 만들었 으며 PyPI를 통해 설치할 수 있습니다.
easy_install tornado_subprocess
이제 다음과 같이 할 수 있습니다 :
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
RequestHandler와 함께 사용할 수도 있습니다.
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
기존 솔루션이 효과가 없었습니다 (아래 세부 정보 참조). 마지막으로 작동 한 것은 read (1) ( 이 답변을 기반으로)을 사용하여 readline을 구현하는 것이 었습니다 . 후자는 차단하지 않습니다.
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
기존 솔루션이 작동하지 않는 이유 :
- 큐 라인을 포함하여 readline이 필요한 솔루션은 항상 차단됩니다. readline을 실행하는 스레드를 강제 종료하는 것은 어렵습니다 (불가능합니까?). 생성 된 프로세스가 완료 될 때만 종료되지만 출력 생성 프로세스가 종료 된 경우에는 종료되지 않습니다.
- anonnn이 지적한 것처럼 저수준 fcntl과 고수준 readline 호출을 혼합하면 제대로 작동하지 않을 수 있습니다.
- select.poll () 사용은 깔끔하지만 파이썬 문서에 따라 Windows에서는 작동하지 않습니다.
- 타사 라이브러리를 사용하면이 작업이 과도하게 수행되고 종속성이 추가됩니다.
다음은 부분 라인을 포함하여 서브 프로세스 ASAP의 모든 출력을 포착하는 데 사용되는 코드입니다. 그것은 동시에 올바른 순서로 stdout과 stderr를 펌핑합니다.
Python 2.7 Linux 및 Windows에서 올바르게 테스트되었습니다.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
이 버전의 비 차단 읽기 에는 특별한 모듈이 필요 하지 않으며 대부분의 Linux 배포판에서 기본적으로 작동합니다.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
이 문제를 추가하여 일부 하위 프로세스를 읽습니다. 비 차단 읽기 솔루션은 다음과 같습니다.
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Windows 및 Unix에서 비 차단 파이프를 설정하는 기능을 제공하므로 여기에이 답변을 추가하십시오.
모든 ctypes
세부 사항은 @techtonik의 답변 덕분 입니다.
Unix 및 Windows 시스템에서 사용할 약간 수정 된 버전이 있습니다.
- Python3 호환 (사소한 변경 만 필요) .
- posix 버전을 포함하고 둘 중 하나에 사용할 예외를 정의합니다.
이런 식으로 Unix 및 Windows 코드에 대해 동일한 기능과 예외를 사용할 수 있습니다.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
불완전한 데이터를 읽지 않기 위해 필자는 독자적인 readline 생성기 (각 줄의 바이트 문자열을 반환)를 작성했습니다.
예를 들어 발전기입니다.
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
원래 질문자의 문제가 있지만 스레드를 호출하고 싶지 않았습니다. 나는 Jesse의 솔루션과 파이프의 직접 read ()를 혼합하고 라인 읽기를위한 자체 버퍼 핸들러 (그러나 하위 프로세스-ping-는 항상 시스템 페이지 크기의 전체 라인을 썼습니다). gobject에 등록 된 io 시계 만 읽음으로써 바쁜 대기를 피할 수 있습니다. 요즘에는 스레드를 피하기 위해 일반적으로 gobject MainLoop 내에서 코드를 실행합니다.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
감시자는
def watch(f, *other):
print 'reading',f.read()
return True
그리고 메인 프로그램은 핑을 설정 한 다음 gobject 메일 루프를 호출합니다.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
다른 작업은 gobject의 콜백에 첨부됩니다.
선택 모듈은 다음 유용한 입력이 위치를 결정하는 데 도움이됩니다.
그러나 별도의 스레드가 있으면 거의 항상 더 행복합니다. 하나는 stdin을 차단하고 다른 하나는 차단하고 싶지 않은 곳이면 어디든지 수행합니다.
왜 스레드와 큐를 귀찮게합니까? readline ()과 달리 BufferedReader.read1 ()은 \ r \ n 대기를 차단하지 않으며 출력이 들어 오면 최대한 빨리 반환합니다.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
필자의 경우 백그라운드 응용 프로그램의 출력을 잡아서 타임 스탬프, 색상 등을 추가하는 로깅 모듈이 필요했습니다.
실제 I / O를 수행하는 백그라운드 스레드로 끝났습니다. 다음 코드는 POSIX 플랫폼 전용입니다. 불필요한 부분을 제거했습니다.
누군가가이 짐승을 장기간 사용하려면 오픈 디스크립터 관리를 고려하십시오. 제 경우에는 큰 문제가 아니 었습니다.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
실행중인 프로세스에서 stdout과 stderr을 모두 수집하고 싶었지만 결과는 위젯에서 생성 된대로 렌더링하려고했기 때문에 내 문제는 조금 다릅니다.
다른 스크립트를 실행하고 출력을 수집하는 것과 같은 일반적인 작업을 수행 할 필요가 없기 때문에 대기열 또는 추가 스레드를 사용하여 제안 된 많은 해결 방법에 의존하고 싶지 않았습니다.
제안 된 솔루션과 파이썬 문서를 읽은 후 아래 구현과 관련된 문제를 해결했습니다. 예, select
함수 호출을 사용하고 있기 때문에 POSIX에서만 작동합니다 .
나는 문서가 혼란스럽고 구현이 그러한 일반적인 스크립팅 작업에 어색하다는 데 동의합니다. 이전 버전의 파이썬은 기본값이 Popen
다르고 설명이 다르므로 많은 혼란을 겪었습니다. 이것은 Python 2.7.12 및 3.5.2 모두에서 잘 작동하는 것 같습니다.
핵심은 bufsize=1
라인 버퍼링 을 설정 한 다음 universal_newlines=True
이진 파일 대신 텍스트 파일로 처리하여 설정시 기본값이되는 것 bufsize=1
입니다.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG 및 VERBOSE는 단순히 출력을 터미널로 인쇄하는 매크로입니다.
이 솔루션은 여전히 블로킹 readline
기능을 사용하므로 IMHO 99.99 %의 효과가 있으므로 하위 프로세스가 훌륭하고 완전한 라인을 출력한다고 가정합니다.
파이썬을 처음 접하면서 솔루션을 개선하기위한 피드백을 환영합니다.
JF Sebastian의 솔루션을 기반으로 라이브러리를 만들었습니다 . 사용할 수 있습니다.
https://github.com/cenkalti/what
JF Sebastian의 답변 및 여러 다른 소스에서 작업하여 간단한 하위 프로세스 관리자를 구성했습니다. 요청 비 차단 읽기를 제공 할뿐 아니라 여러 프로세스를 병렬로 실행합니다. 그것은 내가 아는 OS 특정 호출을 사용하지 않으므로 어디서나 작동해야합니다.
pypi에서 사용할 수 있습니다 pip install shelljob
. 예제와 전체 문서 는 프로젝트 페이지 를 참조하십시오 .
편집 :이 구현은 여전히 차단됩니다. 대신 JFSebastian의 대답을 사용하십시오 .
나는 최고의 대답을 시도했지만
스레드 코드의 추가 위험과 유지 관리는 걱정 스럽습니다.
io 모듈을
통해
(및 2.6으로 제한됨) BufferedReader를 발견했습니다. 이것은 내 스레드리스 비 차단 솔루션입니다.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
비 최근 블로킹 모드에서 스트림 (하위 프로세스에서 꼬리 실행)에서 한 번에 한 줄을 읽어야하는 동일한 문제에 대해 최근에 우연히 만났으며 다음 문제를 피하고 싶었습니다. readline처럼) 등
여기 내 구현이 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 창 (폴)을 지원하지 않고 EOF를 처리하지 않지만 잘 작동합니다.
이것은 서브 프로세스에서 대화식 명령을 실행하는 예제이며, 의사 터미널을 사용하여 stdout이 대화식입니다. https://stackoverflow.com/a/43012138/3555925를 참조하십시오.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
이 솔루션은 select
모듈을 사용하여 IO 스트림에서 "사용 가능한 데이터를 읽습니다". 이 함수는 데이터를 사용할 수있을 때까지 처음에는 차단하지만 사용 가능한 데이터 만 읽고 더 이상 차단하지 않습니다.
select
모듈을 사용한다는 점을 고려하면 유닉스에서만 작동합니다.
이 코드는 PEP8을 완전히 준수합니다.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
또한 Jesse 가 설명한 문제에 직면하여 Bradley , Andy 및 다른 사람들이했던 것처럼 " 루프"를 사용하여 바쁜 루프를 피하기 위해 차단 모드를 사용하여 문제를 해결했습니다 . 가짜 파이프로 더미 파이프를 사용합니다. 선택은 차단되고 stdin 또는 파이프가 준비 될 때까지 기다립니다. 키를 눌렀을 때 stdin이 차단을 해제하면 키 값을 read (1)로 검색 할 수 있습니다. 다른 스레드가 파이프에 쓰면 파이프가 선택을 차단 해제하고 stdin이 필요하다는 표시로 사용할 수 있습니다. 다음은 몇 가지 참조 코드입니다.
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
현대 파이썬에서는 상황이 훨씬 좋습니다.
다음은 간단한 하위 프로그램 인 "hello.py"입니다.
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
그리고 그것과 상호 작용하는 프로그램 :
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
인쇄됩니다 :
b'hello bob\n'
b'hello alice\n'
여기와 관련 질문 모두에서 거의 모든 이전 답변에 의한 실제 패턴은 자식의 stdout 파일 설명자를 비 차단으로 설정 한 다음 일종의 선택 루프에서 폴링하는 것입니다. 요즘, 그 루프는 asyncio에 의해 제공됩니다.
파이썬에서 비 차단 읽기 및 백그라운드 쓰기를 지원하는 모듈은 다음과 같습니다.
https://pypi.python.org/pypi/python-nonblock
기능을 제공합니다
nonblock_read : 사용 가능한 경우 스트림에서 데이터를 읽거나 그렇지 않으면 빈 문자열을 반환합니다 (또는 스트림이 반대쪽에서 닫혀 있고 가능한 모든 데이터를 읽은 경우 없음)
python-subprocess2 모듈을 고려할 수도 있습니다.
https://pypi.python.org/pypi/python-subprocess2
하위 프로세스 모듈에 추가됩니다. 따라서 "subprocess.Popen"에서 리턴 된 오브젝트에 추가 메소드 runInBackground가 추가됩니다. 이것은 스레드를 시작하고 주 스레드를 차단하지 않고 stdout / stderr에 물건을 쓸 때 자동으로 채워지는 객체를 반환합니다.
즐겨!
참고 URL : https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
'Programming' 카테고리의 다른 글
Rails에서 404로 리디렉션하는 방법은 무엇입니까? (0) | 2020.02.14 |
---|---|
Base64로 배경 이미지 데이터를 CSS에 포함시키는 것이 좋거나 나쁜 습관입니까? (0) | 2020.02.14 |
matplotlib에서 플롯을 지우는 데 cla (), clf () 또는 close ()를 사용하는 경우는 언제입니까? (0) | 2020.02.14 |
ES6 클래스 변수 대안 (0) | 2020.02.14 |
잠금은 정확히 어떻게 작동합니까? (0) | 2020.02.14 |