Compare commits
2 Commits
d6bbcaa65c
...
efb8f22872
Author | SHA1 | Date | |
---|---|---|---|
efb8f22872 | |||
db8bdf4a6d |
@ -20,10 +20,10 @@ class Controller:
|
||||
version, module, message = uri.split('.', 2)
|
||||
uri = module + '.' + message
|
||||
if module not in self.risotto_modules:
|
||||
return await remote.call_or_publish(module,
|
||||
version,
|
||||
message,
|
||||
kwargs)
|
||||
return await remote.remove_call(module,
|
||||
version,
|
||||
message,
|
||||
kwargs)
|
||||
return await dispatcher.call(version,
|
||||
uri,
|
||||
risotto_context,
|
||||
@ -37,14 +37,15 @@ class Controller:
|
||||
version, module, submessage = uri.split('.', 2)
|
||||
version, message = uri.split('.', 1)
|
||||
if module not in self.risotto_modules:
|
||||
await remote.call_or_publish(module,
|
||||
version,
|
||||
submessage,
|
||||
kwargs)
|
||||
await dispatcher.publish(version,
|
||||
message,
|
||||
risotto_context,
|
||||
**kwargs)
|
||||
await remote.remove_call(module,
|
||||
version,
|
||||
submessage,
|
||||
kwargs)
|
||||
else:
|
||||
await dispatcher.publish(version,
|
||||
message,
|
||||
risotto_context,
|
||||
**kwargs)
|
||||
|
||||
async def on_join(self,
|
||||
risotto_context):
|
||||
|
@ -134,7 +134,10 @@ class PublishDispatcher:
|
||||
version,
|
||||
message,
|
||||
'event')
|
||||
function_objs = self.messages[version][message].get('functions', [])
|
||||
try:
|
||||
function_objs = self.messages[version][message].get('functions', [])
|
||||
except KeyError:
|
||||
raise ValueError(_(f'cannot find message {version}.{message}'))
|
||||
# do not start a new database connection
|
||||
if hasattr(old_risotto_context, 'connection'):
|
||||
risotto_context.connection = old_risotto_context.connection
|
||||
|
@ -1,5 +1,8 @@
|
||||
from typing import Dict, Any
|
||||
from json import dumps
|
||||
from asyncpg.exceptions import UndefinedTableError
|
||||
|
||||
|
||||
from .context import Context
|
||||
from .utils import _
|
||||
from .config import get_config
|
||||
@ -23,7 +26,10 @@ class Logger:
|
||||
args.append(dumps(data))
|
||||
|
||||
sql = insert + ') ' + values + ')'
|
||||
await risotto_context.connection.fetch(sql, *args)
|
||||
try:
|
||||
await risotto_context.connection.fetch(sql, *args)
|
||||
except UndefinedTableError as err:
|
||||
raise Exception(_(f'cannot access to database ({err}), was the database really created?'))
|
||||
|
||||
def _get_message_paths(self,
|
||||
risotto_context: Context):
|
||||
|
@ -1,4 +1,3 @@
|
||||
from collections import OrderedDict
|
||||
from os.path import join, basename, dirname
|
||||
from glob import glob
|
||||
|
||||
@ -9,8 +8,11 @@ from tiramisu import StrOption, IntOption, BoolOption, ChoiceOption, OptionDescr
|
||||
from yaml import load, SafeLoader
|
||||
from os import listdir
|
||||
from os.path import isfile
|
||||
from ..config import get_config
|
||||
from ..utils import _
|
||||
|
||||
|
||||
from .config import get_config
|
||||
from .utils import _
|
||||
|
||||
|
||||
MESSAGE_ROOT_PATH = get_config()['global']['message_root_path']
|
||||
CUSTOMTYPES = {}
|
||||
@ -42,14 +44,16 @@ class MessageDefinition:
|
||||
A MessageDefinition is a representation of a message in the Zephir application messaging context
|
||||
"""
|
||||
__slots__ = ('version',
|
||||
'uri',
|
||||
'message',
|
||||
'description',
|
||||
'parameters',
|
||||
'default_roles',
|
||||
'errors',
|
||||
'pattern',
|
||||
'related',
|
||||
'response')
|
||||
'response',
|
||||
'options',
|
||||
)
|
||||
|
||||
def __init__(self,
|
||||
raw_def,
|
||||
@ -57,17 +61,18 @@ class MessageDefinition:
|
||||
message):
|
||||
# default value for non mandatory key
|
||||
self.version = version
|
||||
self.parameters = OrderedDict()
|
||||
self.parameters = {}
|
||||
self.errors = []
|
||||
self.related = []
|
||||
self.default_roles = []
|
||||
self.response = None
|
||||
self.uri = message
|
||||
self.message = message
|
||||
self.options = None
|
||||
|
||||
# loads yaml information into object
|
||||
for key, value in raw_def.items():
|
||||
if key == 'uri':
|
||||
raise Exception('uri in not allowed in message')
|
||||
if key == 'message':
|
||||
raise Exception('message in not allowed in message')
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
if key == 'pattern':
|
||||
@ -83,8 +88,10 @@ class MessageDefinition:
|
||||
elif key == 'response':
|
||||
value = ResponseDefinition(value,
|
||||
self.version)
|
||||
elif key == 'errors':
|
||||
value = _parse_error_definition(value)
|
||||
elif key == 'description':
|
||||
value = value.strip().rstrip()
|
||||
if value.endswith('.'):
|
||||
value = value[:-1]
|
||||
setattr(self, key, value)
|
||||
# check mandatory keys
|
||||
for key in self.__slots__:
|
||||
@ -130,6 +137,9 @@ class ParameterDefinition:
|
||||
else:
|
||||
self._valid_type(value)
|
||||
#self._valid_type(value)
|
||||
elif key == 'description':
|
||||
if value.endswith('.'):
|
||||
value = value[:-1]
|
||||
setattr(self, key, value)
|
||||
# check mandatory keys
|
||||
for key in self.__slots__:
|
||||
@ -182,6 +192,9 @@ class ResponseDefinition:
|
||||
value = CUSTOMTYPES[version][value].type
|
||||
else:
|
||||
self._valid_type(value)
|
||||
elif key == 'description':
|
||||
if value.endswith('.'):
|
||||
value = value[:-1]
|
||||
setattr(self, key, value)
|
||||
# check mandatory keys
|
||||
for key in self.__slots__:
|
||||
@ -197,29 +210,9 @@ class ResponseDefinition:
|
||||
raise Exception(_('unknown parameter type: {}').format(typ))
|
||||
|
||||
|
||||
class ErrorDefinition:
|
||||
"""
|
||||
An ErrorDefinition is a representation of an error in the Zephir application messaging context
|
||||
"""
|
||||
__slots__ = ('uri',)
|
||||
|
||||
def __init__(self, raw_err):
|
||||
extra_keys = set(raw_err) - set(self.__slots__)
|
||||
if extra_keys:
|
||||
raise Exception(_('extra keys for errors: {}').format(extra_keys))
|
||||
self.uri = raw_err['uri']
|
||||
|
||||
|
||||
def _parse_error_definition(raw_defs):
|
||||
new_value = []
|
||||
for raw_err in raw_defs:
|
||||
new_value.append(ErrorDefinition(raw_err))
|
||||
return new_value
|
||||
|
||||
|
||||
def _parse_parameters(raw_defs,
|
||||
version):
|
||||
parameters = OrderedDict()
|
||||
parameters = {}
|
||||
for name, raw_def in raw_defs.items():
|
||||
parameters[name] = ParameterDefinition(name,
|
||||
version,
|
||||
@ -228,7 +221,8 @@ def _parse_parameters(raw_defs,
|
||||
|
||||
|
||||
def get_message(uri: str,
|
||||
current_module_names: str):
|
||||
current_module_names: str,
|
||||
) -> MessageDefinition:
|
||||
try:
|
||||
version, message = uri.split('.', 1)
|
||||
path = get_message_file_path(version,
|
||||
@ -299,6 +293,9 @@ class CustomParam:
|
||||
value = self._convert_type(value, raw_def)
|
||||
elif key == 'items':
|
||||
continue
|
||||
elif key == 'description':
|
||||
if value.endswith('.'):
|
||||
value = value[:-1]
|
||||
setattr(self, key, value)
|
||||
|
||||
# check mandatory keys
|
||||
@ -331,7 +328,7 @@ class CustomParam:
|
||||
|
||||
|
||||
def _parse_custom_params(raw_defs, required):
|
||||
parameters = OrderedDict()
|
||||
parameters = {}
|
||||
for name, raw_def in raw_defs.items():
|
||||
parameters[name] = CustomParam(name, raw_def, required)
|
||||
return parameters
|
||||
@ -357,6 +354,9 @@ class CustomType:
|
||||
value = self._convert_type(value, raw_def)
|
||||
elif key == 'properties':
|
||||
value = _parse_custom_params(value, raw_def.get('required', {}))
|
||||
elif key == 'description':
|
||||
if value.endswith('.'):
|
||||
value = value[:-1]
|
||||
|
||||
setattr(self, key, value)
|
||||
# check mandatory keys
|
||||
@ -435,12 +435,13 @@ def _get_option(name,
|
||||
props = []
|
||||
if not hasattr(arg, 'default'):
|
||||
props.append('mandatory')
|
||||
props.append(Calculation(calc_value,
|
||||
Params(ParamValue('disabled'),
|
||||
kwargs={'condition': ParamOption(select_option, todict=True),
|
||||
'expected': ParamValue(optiondescription),
|
||||
'reverse_condition': ParamValue(True)}),
|
||||
calc_value_property_help))
|
||||
if select_option:
|
||||
props.append(Calculation(calc_value,
|
||||
Params(ParamValue('disabled'),
|
||||
kwargs={'condition': ParamOption(select_option, todict=True),
|
||||
'expected': ParamValue(optiondescription),
|
||||
'reverse_condition': ParamValue(True)}),
|
||||
calc_value_property_help))
|
||||
|
||||
description = arg.description.strip().rstrip()
|
||||
kwargs = {'name': name,
|
||||
@ -472,7 +473,6 @@ def _get_option(name,
|
||||
|
||||
def get_options(message_def,
|
||||
file_path,
|
||||
needs,
|
||||
select_option,
|
||||
optiondescription,
|
||||
load_shortarg):
|
||||
@ -496,7 +496,7 @@ def _parse_responses(message_def,
|
||||
"""build option with returns
|
||||
"""
|
||||
if message_def.response.parameters is None:
|
||||
raise Exception('uri "{}" did not returned any valid parameters.'.format(message_def.uri))
|
||||
raise Exception('message "{}" did not returned any valid parameters.'.format(message_def.message))
|
||||
|
||||
options = []
|
||||
names = []
|
||||
@ -518,41 +518,21 @@ def _parse_responses(message_def,
|
||||
# FIXME
|
||||
'File': StrOption}.get(type_)
|
||||
if not option:
|
||||
raise Exception(f'unknown param type {obj.type} in responses of message {message_def.uri}')
|
||||
raise Exception(f'unknown param type {obj.type} in responses of message {message_def.message}')
|
||||
if hasattr(obj, 'default'):
|
||||
kwargs['default'] = obj.default
|
||||
else:
|
||||
kwargs['properties'] = ('mandatory',)
|
||||
options.append(option(**kwargs))
|
||||
od = OptionDescription(message_def.uri,
|
||||
od = OptionDescription(message_def.message,
|
||||
message_def.response.description,
|
||||
options)
|
||||
od.impl_set_information('multi', message_def.response.multi)
|
||||
return od
|
||||
|
||||
|
||||
def _getoptions_from_yml(message_def,
|
||||
optiondescriptions,
|
||||
file_path,
|
||||
needs,
|
||||
select_option,
|
||||
load_shortarg):
|
||||
if message_def.pattern == 'event' and message_def.response:
|
||||
raise Exception('event with response?: {}'.format(file_path))
|
||||
if message_def.pattern == 'rpc' and not message_def.response:
|
||||
print('rpc without response?: {}'.format(file_path))
|
||||
options = get_options(message_def,
|
||||
file_path,
|
||||
needs,
|
||||
select_option,
|
||||
message_def.uri,
|
||||
load_shortarg)
|
||||
name = message_def.uri
|
||||
description = message_def.description.strip().rstrip()
|
||||
optiondescriptions[name] = (description, options)
|
||||
|
||||
|
||||
def _get_root_option(select_option, optiondescriptions):
|
||||
def _get_root_option(select_option,
|
||||
optiondescriptions):
|
||||
"""get root option
|
||||
"""
|
||||
def _get_od(curr_ods):
|
||||
@ -600,7 +580,6 @@ def get_messages(current_module_names,
|
||||
global CUSTOMTYPES
|
||||
optiondescriptions = {}
|
||||
optiondescriptions_info = {}
|
||||
needs = {}
|
||||
messages = list(list_messages(uris,
|
||||
current_module_names,
|
||||
current_version))
|
||||
@ -618,25 +597,29 @@ def get_messages(current_module_names,
|
||||
load_customtypes(listdir(join(MESSAGE_ROOT_PATH, version)))
|
||||
else:
|
||||
load_customtypes(current_module_names)
|
||||
for message_name in messages:
|
||||
message_def = get_message(message_name,
|
||||
current_module_names)
|
||||
optiondescriptions_info[message_def.uri] = {'pattern': message_def.pattern,
|
||||
'default_roles': message_def.default_roles,
|
||||
'version': message_name.split('.')[0]}
|
||||
for uri in messages:
|
||||
message_def = get_message(uri,
|
||||
current_module_names,
|
||||
)
|
||||
optiondescriptions_info[message_def.message] = {'pattern': message_def.pattern,
|
||||
'default_roles': message_def.default_roles,
|
||||
'version': message_def.version}
|
||||
if message_def.pattern == 'rpc':
|
||||
optiondescriptions_info[message_def.uri]['response'] = _parse_responses(message_def,
|
||||
message_name)
|
||||
if not message_def.response:
|
||||
raise Exception(f'rpc without response is not allowed {uri}')
|
||||
optiondescriptions_info[message_def.message]['response'] = _parse_responses(message_def,
|
||||
uri)
|
||||
elif message_def.response:
|
||||
raise Exception(f'response not allowed for {message_def.uri}')
|
||||
_getoptions_from_yml(message_def,
|
||||
optiondescriptions,
|
||||
message_name,
|
||||
needs,
|
||||
select_option,
|
||||
load_shortarg)
|
||||
raise Exception(f'response is not allowed for {uri}')
|
||||
message_def.options = get_options(message_def,
|
||||
uri,
|
||||
select_option,
|
||||
message_def.message,
|
||||
load_shortarg)
|
||||
optiondescriptions[message_def.message] = (message_def.description, message_def.options)
|
||||
|
||||
root = _get_root_option(select_option, optiondescriptions)
|
||||
root = _get_root_option(select_option,
|
||||
optiondescriptions)
|
||||
if current_module_names is None:
|
||||
CUSTOMTYPES = {}
|
||||
return optiondescriptions_info, root
|
@ -1 +0,0 @@
|
||||
from .message import get_messages
|
@ -5,18 +5,19 @@ from tiramisu_api import Config
|
||||
|
||||
|
||||
from .config import get_config
|
||||
from .utils import _
|
||||
#
|
||||
#
|
||||
# ALLOW_INSECURE_HTTPS = get_config()['submodule']['allow_insecure_https']
|
||||
# ALLOW_INSECURE_HTTPS = get_config()['module']['allow_insecure_https']
|
||||
|
||||
|
||||
class Remote:
|
||||
submodules = {}
|
||||
|
||||
async def _get_config(self,
|
||||
submodule: str,
|
||||
module: str,
|
||||
url: str) -> None:
|
||||
if submodule not in self.submodules:
|
||||
if module not in self.submodules:
|
||||
session = ClientSession()
|
||||
async with session.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
@ -27,22 +28,25 @@ class Remote:
|
||||
err = await resp.text()
|
||||
raise Exception(err)
|
||||
json = await resp.json()
|
||||
self.submodules[submodule] = json
|
||||
return Config(self.submodules[submodule])
|
||||
self.submodules[module] = json
|
||||
return Config(self.submodules[module])
|
||||
|
||||
async def remove_call(self,
|
||||
submodule: str,
|
||||
module: str,
|
||||
version: str,
|
||||
message: str,
|
||||
submessage: str,
|
||||
payload) -> dict:
|
||||
domain_name = get_config()['submodule'][submodule]
|
||||
try:
|
||||
domain_name = get_config()['module'][module]
|
||||
except KeyError:
|
||||
raise ValueError(_(f'cannot find information of remote module "{module}" to access to "{version}.{module}.{submessage}"'))
|
||||
remote_url = f'http://{domain_name}:8080/api/{version}'
|
||||
message_url = f'{remote_url}/{message}'
|
||||
message_url = f'{remote_url}/{submessage}'
|
||||
|
||||
config = await self._get_config(submodule,
|
||||
config = await self._get_config(module,
|
||||
remote_url)
|
||||
for key, value in payload.items():
|
||||
path = message + '.' + key
|
||||
path = submessage + '.' + key
|
||||
config.option(path).value.set(value)
|
||||
session = ClientSession()
|
||||
async with session.post(message_url, data=dumps(payload)) as resp:
|
||||
|
Loading…
x
Reference in New Issue
Block a user