100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
|
import re
|
||
|
import pexpect
|
||
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||
|
|
||
|
class ExpectationCollection:
|
||
|
def __init__(self):
|
||
|
self.expectations_lookup = {}
|
||
|
|
||
|
def add_expectation(self, expectation):
|
||
|
if expectation.get_pattern() in self.expectations_lookup:
|
||
|
if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]:
|
||
|
print(f'Can not add {expectation} to collection')
|
||
|
return False
|
||
|
self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation]
|
||
|
return True
|
||
|
|
||
|
def add_expectation_from_descr(self, expectation, response=None, name=None):
|
||
|
expectation = Expectation(expectation, response=response, name=name)
|
||
|
self.add_expectation(expectation)
|
||
|
|
||
|
def get_patterns(self):
|
||
|
return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys())
|
||
|
|
||
|
def get_expectations_by_pattern(self, pattern):
|
||
|
return self.expectations_lookup.get(pattern, [])
|
||
|
|
||
|
def get_expectation_by_name(self, name):
|
||
|
return [expectation.name for expectation in self.get_expectations(recursive=True)]
|
||
|
|
||
|
def get_expectations(self, recursive=False):
|
||
|
expectations = [exp for exps in self.expectations_lookup.values() for exp in exps]
|
||
|
if recursive:
|
||
|
expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()])
|
||
|
pass
|
||
|
return expectations
|
||
|
|
||
|
def count_expectations(self):
|
||
|
return sum([exp.count_expectations() for exp in self.get_expectations()])
|
||
|
|
||
|
def merge(self, collection):
|
||
|
for expectation in collection.get_expectations():
|
||
|
self.add_expectation(expectation)
|
||
|
|
||
|
|
||
|
class Expectation:
|
||
|
def __init__(self, multiline_pattern, response=None, name=None):
|
||
|
self.name = name
|
||
|
self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()]
|
||
|
self.pattern = self.context[-1]
|
||
|
self.response = response
|
||
|
self.next = None
|
||
|
|
||
|
def get_pattern(self):
|
||
|
return re.compile(f'(.*){re.escape(self.pattern)}(.*)')
|
||
|
|
||
|
def set_next_expectation(self, expectation):
|
||
|
self.next = expectation
|
||
|
|
||
|
def set_response(self, response):
|
||
|
self.response = response
|
||
|
|
||
|
def expect(self, spawned):
|
||
|
print(f'-> expecting next "{self.pattern}"')
|
||
|
p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern()])
|
||
|
if p not in [0, 1]:
|
||
|
self.answer(spawned)
|
||
|
else:
|
||
|
print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}')
|
||
|
|
||
|
def answer(self, spawned):
|
||
|
print(f'-> answering "{self.response}" to "{spawned.after}"')
|
||
|
if self.response is not None:
|
||
|
spawned.sendline(self.response)
|
||
|
if self.next:
|
||
|
self.next.expect(spawned)
|
||
|
|
||
|
def is_the_one(self, context):
|
||
|
#print(f'testing if it is the one {self.get_pattern()} for {context}')
|
||
|
context = [l.strip() for l in context.strip().split('\r\n') if l.strip()]
|
||
|
if not context:
|
||
|
print('No context provided')
|
||
|
return False
|
||
|
context.reverse()
|
||
|
for index, c in enumerate(self.context[len(self.context)-2::-1]):
|
||
|
if c != ansi_escape.sub('', context[index]):
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def count_expectations(self):
|
||
|
count = 1
|
||
|
if self.next:
|
||
|
count += self.next.count_expectations()
|
||
|
return count
|
||
|
|
||
|
def get_following_expectations(self):
|
||
|
if self.next:
|
||
|
return [self.next] + self.next.get_following_expectations()
|
||
|
return []
|
||
|
|