2021-08-30 15:31:37 +02:00
|
|
|
import re
|
|
|
|
import pexpect
|
|
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
|
|
class ExpectationCollection:
|
|
|
|
def __init__(self):
|
|
|
|
self.expectations_lookup = {}
|
|
|
|
|
|
|
|
def add_expectation(self, expectation):
|
|
|
|
if expectation.get_pattern() in self.expectations_lookup:
|
|
|
|
if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]:
|
|
|
|
print(f'Can not add {expectation} to collection')
|
|
|
|
return False
|
|
|
|
self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation]
|
|
|
|
return True
|
|
|
|
|
|
|
|
def add_expectation_from_descr(self, expectation, response=None, name=None):
|
|
|
|
expectation = Expectation(expectation, response=response, name=name)
|
|
|
|
self.add_expectation(expectation)
|
|
|
|
|
|
|
|
def get_patterns(self):
|
|
|
|
return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys())
|
|
|
|
|
|
|
|
def get_expectations_by_pattern(self, pattern):
|
|
|
|
return self.expectations_lookup.get(pattern, [])
|
|
|
|
|
2021-08-31 08:54:40 +02:00
|
|
|
def get_expectations_by_name(self, name):
|
|
|
|
return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name]
|
|
|
|
|
|
|
|
def get_exposed_expectation_names(self):
|
|
|
|
return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name]
|
|
|
|
|
|
|
|
def set_expectation_response_by_name(self, name, response):
|
|
|
|
for expectation in self.get_expectations_by_name(name):
|
|
|
|
expectation.set_response(response)
|
2021-08-30 15:31:37 +02:00
|
|
|
|
|
|
|
def get_expectations(self, recursive=False):
|
|
|
|
expectations = [exp for exps in self.expectations_lookup.values() for exp in exps]
|
|
|
|
if recursive:
|
|
|
|
expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()])
|
2021-09-02 13:47:47 +02:00
|
|
|
return list(set(expectations))
|
2021-08-30 15:31:37 +02:00
|
|
|
|
|
|
|
def count_expectations(self):
|
2021-08-31 08:54:40 +02:00
|
|
|
return len(self.get_expectations(recursive=True))
|
2021-08-30 15:31:37 +02:00
|
|
|
|
|
|
|
def merge(self, collection):
|
|
|
|
for expectation in collection.get_expectations():
|
|
|
|
self.add_expectation(expectation)
|
|
|
|
|
|
|
|
|
|
|
|
class Expectation:
|
|
|
|
def __init__(self, multiline_pattern, response=None, name=None):
|
|
|
|
self.name = name
|
|
|
|
self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()]
|
|
|
|
self.pattern = self.context[-1]
|
|
|
|
self.response = response
|
2021-08-31 08:54:40 +02:00
|
|
|
self.next = {}
|
2021-08-30 15:31:37 +02:00
|
|
|
|
2021-08-31 08:54:40 +02:00
|
|
|
def get_pattern(self, previous_answer=None):
|
|
|
|
return re.compile(f'(.*){re.escape(self.pattern.format(variable=previous_answer))}(.*)')
|
2021-08-30 15:31:37 +02:00
|
|
|
|
2021-08-31 08:54:40 +02:00
|
|
|
def set_next_expectation(self, expectation, triggers=None):
|
|
|
|
if not isinstance(triggers, list):
|
|
|
|
triggers = [triggers]
|
|
|
|
for trigger in triggers:
|
|
|
|
self.next[trigger] = expectation
|
2021-08-30 15:31:37 +02:00
|
|
|
|
|
|
|
def set_response(self, response):
|
2021-08-31 08:54:40 +02:00
|
|
|
print(f'Setting {response} for {self.pattern}')
|
2021-08-30 15:31:37 +02:00
|
|
|
self.response = response
|
|
|
|
|
2021-08-31 08:54:40 +02:00
|
|
|
def expect(self, spawned, previous_answer=None):
|
2021-09-02 13:47:47 +02:00
|
|
|
print(f'-> expecting next "{self.pattern.format(variable=previous_answer)}"')
|
2021-08-31 08:54:40 +02:00
|
|
|
p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)])
|
2021-08-30 15:31:37 +02:00
|
|
|
if p not in [0, 1]:
|
|
|
|
self.answer(spawned)
|
|
|
|
else:
|
|
|
|
print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}')
|
|
|
|
|
|
|
|
def answer(self, spawned):
|
|
|
|
print(f'-> answering "{self.response}" to "{spawned.after}"')
|
|
|
|
if self.response is not None:
|
|
|
|
spawned.sendline(self.response)
|
2021-08-31 08:54:40 +02:00
|
|
|
if self.response in self.next:
|
|
|
|
self.next[self.response].expect(spawned, previous_answer=self.response)
|
|
|
|
elif None in self.next:
|
|
|
|
self.next[None].expect(spawned, previous_answer=self.response)
|
2021-08-30 15:31:37 +02:00
|
|
|
|
|
|
|
def is_the_one(self, context):
|
|
|
|
#print(f'testing if it is the one {self.get_pattern()} for {context}')
|
|
|
|
context = [l.strip() for l in context.strip().split('\r\n') if l.strip()]
|
|
|
|
if not context:
|
|
|
|
print('No context provided')
|
|
|
|
return False
|
|
|
|
context.reverse()
|
|
|
|
for index, c in enumerate(self.context[len(self.context)-2::-1]):
|
|
|
|
if c != ansi_escape.sub('', context[index]):
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
def get_following_expectations(self):
|
2021-09-02 13:47:47 +02:00
|
|
|
following = []
|
|
|
|
for next_expectation in set(self.next.values()):
|
|
|
|
following.append(next_expectation)
|
|
|
|
following.extend(next_expectation.get_following_expectations())
|
|
|
|
return following
|