Source code for cptree.watcher
# invoke watcher
import re
from typing import Generator
from invoke.watchers import StreamWatcher
from .common import parse_int
[docs]
class LineWatcher(StreamWatcher):
def __init__(self, *, file_callback=None, progress_callback=None, line_callback=None):
self.index = 0
self.file_pattern = re.compile(r"^~([^\s]+)\s([0-9,]+)\s(.*)")
self.percent_pattern = re.compile(r"^\s*([^\s]+)\s+([0-9\.]+)%")
self.file_callback = file_callback
self.progress_callback = progress_callback
self.line_callback = line_callback
self.file_count = 0
self.byte_count = 0
super().__init__()
[docs]
def parse_line(self, line):
if self.line_callback:
return self.line_callback(line)
file = self.file_pattern.match(line)
if file:
codes, length, filename = file.groups()
self.file_count += 1
return self.file_callback(filename, self.file_count, parse_int(length), codes)
percent = self.percent_pattern.match(line)
if percent:
length, progress = percent.groups()
length = parse_int(length)
chunk = length - self.byte_count
self.byte_count = length
return self.progress_callback(chunk, progress)
[docs]
def submit(self, stream: str) -> Generator[str, None, None]:
while True:
try:
pos = stream.index("\n", self.index)
except ValueError:
try:
pos = stream.index("\r", self.index)
except ValueError:
return
chunk = stream[self.index : pos] # noqa: E203
self.index = pos + 1
chunks = chunk.split("\n")
for chunk in chunks:
for line in chunk.split("\r"):
if len(line):
self.parse_line(line)
yield ""