pexpectation/pexpectation.py

106 lines
4.4 KiB
Python

import re
import pexpect
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
class ExpectationCollection:
def __init__(self):
self.expectations_lookup = {}
def add_expectation(self, expectation):
if expectation.get_pattern() in self.expectations_lookup:
if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]:
print(f'Can not add {expectation} to collection')
return False
self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation]
return True
def add_expectation_from_descr(self, expectation, response=None, name=None):
expectation = Expectation(expectation, response=response, name=name)
self.add_expectation(expectation)
def get_patterns(self):
return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys())
def get_expectations_by_pattern(self, pattern):
return self.expectations_lookup.get(pattern, [])
def get_expectations_by_name(self, name):
return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name]
def get_exposed_expectation_names(self):
return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name]
def set_expectation_response_by_name(self, name, response):
for expectation in self.get_expectations_by_name(name):
expectation.set_response(response)
def get_expectations(self, recursive=False):
expectations = [exp for exps in self.expectations_lookup.values() for exp in exps]
if recursive:
expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()])
return list(set(expectations))
def count_expectations(self):
return len(self.get_expectations(recursive=True))
def merge(self, collection):
for expectation in collection.get_expectations():
self.add_expectation(expectation)
class Expectation:
def __init__(self, multiline_pattern, response=None, name=None):
self.name = name
self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()]
self.pattern = self.context[-1]
self.response = response
self.next = {}
def get_pattern(self, previous_answer=None):
return re.compile(f'(.*){re.escape(self.pattern.format(variable=previous_answer))}(.*)')
def set_next_expectation(self, expectation, triggers=None):
if not isinstance(triggers, list):
triggers = [triggers]
for trigger in triggers:
self.next[trigger] = expectation
def set_response(self, response):
print(f'Setting {response} for {self.pattern}')
self.response = response
def expect(self, spawned, previous_answer=None):
print(f'-> expecting next "{self.pattern.format(variable=previous_answer)}"')
p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)])
if p not in [0, 1]:
self.answer(spawned)
else:
print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}')
def answer(self, spawned):
print(f'-> answering "{self.response}" to "{spawned.after}"')
if self.response is not None:
spawned.sendline(self.response)
if self.response in self.next:
self.next[self.response].expect(spawned, previous_answer=self.response)
elif None in self.next:
self.next[None].expect(spawned, previous_answer=self.response)
def is_the_one(self, context):
#print(f'testing if it is the one {self.get_pattern()} for {context}')
context = [l.strip() for l in context.strip().split('\r\n') if l.strip()]
if not context:
print('No context provided')
return False
context.reverse()
for index, c in enumerate(self.context[len(self.context)-2::-1]):
if c != ansi_escape.sub('', context[index]):
return False
return True
def get_following_expectations(self):
following = []
for next_expectation in set(self.next.values()):
following.append(next_expectation)
following.extend(next_expectation.get_following_expectations())
return following