#!/usr/bin/env python
from __future__ import unicode_literals
import html
import json
import logging
import sys
import six
from prompt_toolkit import PromptSession, HTML
from prompt_toolkit.eventloop import Future, ensure_future, Return, From
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import Float, HSplit
from prompt_toolkit.layout.dimension import D
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.widgets import Dialog, Button, TextArea
from pygments.lexers.data import YamlLexer, JsonLexer
from pygments.styles import get_style_by_name
from .completers import CompleteFromDocumentKeys
from .validators import YAMLValidator, JSONValidator
from .. import __application__
from ..utils import yaml
from ..utils.yaml import ParserError
__author__ = "Giuseppe Chiesa"
__copyright__ = "Copyright 2017-2021, Giuseppe Chiesa"
__credits__ = ["Giuseppe Chiesa"]
__license__ = "BSD"
__maintainer__ = "Giuseppe Chiesa"
__email__ = "mail@giuseppechiesa.it"
__status__ = "PerpetualBeta"
DEFAULT_STYLE = 'native'
SHORTCUTS_HELP = [
'CTRL+s : Search forward (arrows for prev/next)',
'CTRL+r : Search backwards (arrows for prev/next)',
'CTRL+x + CTRL+u : Undo',
'CTRL+SPACE : Start Selection',
'CTRL+w : Cut Selection',
'ESC+w : Copy Selection',
'CTRL+y : Paste Selection',
'CTRL+k : Delete until end of line',
'CTRL+c : Exit without saving'
]
[docs]class EditorException(Exception):
pass
[docs]class EditorAbortException(Exception):
pass
[docs]class MessageDialog(object):
def __init__(self, title, text):
self.future = Future()
def set_done():
self.future.set_result(None)
ok_button = Button(text='OK', handler=set_done)
self.dialog = Dialog(
title=title,
body=HSplit([
TextArea(text=text, scrollbar=True, read_only=True),
]),
buttons=[ok_button],
width=D(preferred=80),
modal=True)
def __pt_container__(self):
return self.dialog
[docs]class Editor(object):
SUPPORTED_MODE = ('json', 'yaml')
VALIDATORS = {
'yaml': YAMLValidator,
'json': JSONValidator
}
LEXERS = {
'yaml': YamlLexer,
'json': JsonLexer
}
def __init__(self, json_data, attributes=None, mode='json'):
self.logger = logging.getLogger('{a}.{m}'.format(a=__application__, m=self.__class__.__name__))
self._data = json_data
self._mode = mode
self._bindings = None
self._completer = None
self._attributes = attributes
self._result = None
if self._mode not in self.SUPPORTED_MODE:
raise EditorException('Invalid editor mode. Supported mode: {}'.format(self.SUPPORTED_MODE))
try:
json.loads(self._data)
except Exception as e:
raise EditorException('Invalid data. Data is:\n{data}\n Error: {t}. Message: '
'{e}'.format(data=self._data, t=str(type(e)), e=str(e)))
def _load_key_bindings(self):
self._bindings = KeyBindings()
@self._bindings.add('tab')
def tab_event(event):
event.app.current_buffer.insert_text(' ')
@self._bindings.add('f1')
def f1_event(event):
title = 'Shortcuts Help'
text = '\n'.join(SHORTCUTS_HELP)
self._show_message(event.app, title=title, text=text)
def _show_message(self, application, title, text):
def coroutine():
dialog = MessageDialog(title, text)
yield From(self._show_dialog_as_float(application, dialog))
ensure_future(coroutine())
@staticmethod
def _show_dialog_as_float(application, dialog):
"""
Coroutine
:param application:
:param dialog:
:return:
"""
float_ = Float(content=dialog)
float_list = application.layout.container.children[0].floats
float_list.insert(0, float_)
focused_before = application.layout.current_window
application.layout.focus(dialog)
result = yield dialog.future
application.layout.focus(focused_before)
if float_ in float_list:
float_list.remove(float_)
raise Return(result)
[docs] def bottom_bar(self):
data = [
'<b>{}</b>'.format(html.escape('<ESC>+<Enter> : save and exit')),
'<b>{}</b>'.format(html.escape('<F1> : help'))
]
if self._attributes.get('config', None):
data += ['<b>Configuration</b>: {}'.format(html.escape(self._attributes['config']))]
if self._attributes.get('bucket', None):
data += ['<b>Bucket</b>: {}'.format(html.escape(self._attributes['bucket']))]
if self._attributes.get('path', None):
data += ['<b>Path</b>: {}'.format(html.escape(self._attributes['path']))]
if self._attributes.get('debug', None):
data += ['<b>Debug</b>: {}'.format(html.escape(self._attributes['debug']))]
text = ' - '.join(data)
return HTML(six.text_type(text))
@property
def data(self):
tmp = json.loads(self._data)
if self._mode == 'yaml':
text = yaml.write_to_string(tmp)
elif self._mode == 'json':
text = json.dumps(tmp, indent=4, separators=(',', ': '))
else:
raise EditorException('Invalid editor mode')
# ensure there is always room for the help dialog
padding_bottom = (len(SHORTCUTS_HELP) + 6) - len(text.splitlines())
if padding_bottom:
text = '{text}{padding}'.format(text=text, padding='\n' * padding_bottom)
return text
@property
def validator_class(self):
return self.VALIDATORS.get(self._mode)
@property
def lexer_class(self):
return self.LEXERS.get(self._mode)
def _validate(self, result):
try:
if self._mode == 'yaml':
return yaml.load_from_stream(result)
elif self._mode == 'json':
return json.loads(result)
except ParserError:
raise EditorException('Invalid YAML produced. Nothing will be saved')
except ValueError:
raise EditorException('Invalid JSON produced. Nothing will be saved')
[docs] def run(self):
self._load_key_bindings()
style = style_from_pygments_cls(get_style_by_name(DEFAULT_STYLE))
session = PromptSession(multiline=True,
lexer=PygmentsLexer(self.lexer_class),
validator=self.validator_class(),
bottom_toolbar=self.bottom_bar,
mouse_support=False,
style=style,
include_default_pygments_style=False,
validate_while_typing=True,
key_bindings=self._bindings,
completer=CompleteFromDocumentKeys(bottom_toolbar_attributes=self._attributes,
mode=self._mode),
# auto_suggest=AutosuggestFromDocumentData(bottom_toolbar_attributes=self._attributes),
complete_while_typing=True,
)
try:
result = session.prompt('', default=six.text_type(self.data))
except KeyboardInterrupt:
self.logger.debug('Editing aborted.')
raise EditorAbortException
self._result = self._validate(result)
@property
def result(self):
if self._result:
return json.dumps(self._result)
return ''
if __name__ == '__main__':
# edit a file
filename = sys.argv[1]
with open(filename, 'rb') as fh:
data = fh.read().decode()
editor = Editor(data, attributes={'debug': 'enabled'}, mode=sys.argv[2])
editor.run()
print('Result:\n---\n{}'.format(editor.result))