import re import pexpect ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') class ExpectationCollection: def __init__(self): self.expectations_lookup = {} def add_expectation(self, expectation): if expectation.get_pattern() in self.expectations_lookup: if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]: print(f'Can not add {expectation} to collection') return False self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation] return True def add_expectation_from_descr(self, expectation, response=None, name=None): expectation = Expectation(expectation, response=response, name=name) self.add_expectation(expectation) def get_patterns(self): return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys()) def get_expectations_by_pattern(self, pattern): return self.expectations_lookup.get(pattern, []) def get_expectations_by_name(self, name): return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name] def get_exposed_expectation_names(self): return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name] def set_expectation_response_by_name(self, name, response): for expectation in self.get_expectations_by_name(name): expectation.set_response(response) def get_expectations(self, recursive=False): expectations = [exp for exps in self.expectations_lookup.values() for exp in exps] if recursive: expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()]) return list(set(expectations)) def count_expectations(self): return len(self.get_expectations(recursive=True)) def merge(self, collection): for expectation in collection.get_expectations(): self.add_expectation(expectation) class Expectation: def __init__(self, multiline_pattern, response=None, name=None): self.name = name self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()] self.pattern = self.context[-1] self.response = response self.next = {} def get_pattern(self, previous_answer=None): return re.compile(f'(.*){re.escape(self.pattern.format(variable=previous_answer))}(.*)') def set_next_expectation(self, expectation, triggers=None): if not isinstance(triggers, list): triggers = [triggers] for trigger in triggers: self.next[trigger] = expectation def set_response(self, response): print(f'Setting {response} for {self.pattern}') self.response = response def expect(self, spawned, previous_answer=None): print(f'-> expecting next "{self.pattern.format(variable=previous_answer)}"') p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)]) if p not in [0, 1]: self.answer(spawned) else: print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}') def answer(self, spawned): print(f'-> answering "{self.response}" to "{spawned.after}"') if self.response is not None: spawned.sendline(self.response) if self.response in self.next: self.next[self.response].expect(spawned, previous_answer=self.response) elif None in self.next: self.next[None].expect(spawned, previous_answer=self.response) def is_the_one(self, context): #print(f'testing if it is the one {self.get_pattern()} for {context}') context = [l.strip() for l in context.strip().split('\r\n') if l.strip()] if not context: print('No context provided') return False context.reverse() for index, c in enumerate(self.context[len(self.context)-2::-1]): if c != ansi_escape.sub('', context[index]): return False return True def get_following_expectations(self): following = [] for next_expectation in set(self.next.values()): following.append(next_expectation) following.extend(next_expectation.get_following_expectations()) return following