diff --git a/oec/__main__.py b/oec/__main__.py index 2b33519..f659f43 100644 --- a/oec/__main__.py +++ b/oec/__main__.py @@ -20,16 +20,27 @@ IS_VT100_AVAILABLE = True from .keymap_3278_typewriter import KEYMAP as KEYMAP_3278_TYPEWRITER +from .keymap_3278_typewriter_de import KEYMAP as KEYMAP_3278_TYPEWRITER_DE from .keymap_ibm_typewriter import KEYMAP as KEYMAP_IBM_TYPEWRITER from .keymap_ibm_enhanced import KEYMAP as KEYMAP_IBM_ENHANCED -logging.basicConfig(level=logging.INFO) +_LOG_LEVELS = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, +} logger = logging.getLogger('oec.main') -def _get_keymap(_args, keyboard_description): +KEYMAP_3278_LANGUAGE = { + 'us': KEYMAP_3278_TYPEWRITER, + 'de': KEYMAP_3278_TYPEWRITER_DE +} + +def _get_keymap(args, keyboard_description): if keyboard_description.startswith('3278'): - return KEYMAP_3278_TYPEWRITER + return KEYMAP_3278_LANGUAGE.get(args.keyboard_language, KEYMAP_3278_TYPEWRITER) if keyboard_description.startswith('IBM-TYPEWRITER'): return KEYMAP_IBM_TYPEWRITER @@ -71,11 +82,14 @@ def _create_device(args, interface, device_address, _poll_response): terminal = Terminal(interface, device_address, terminal_id, extended_id, features, keymap) + if args.clicker: + terminal.keyboard.clicker = True + return terminal def _create_session(args, device): if args.emulator == 'tn3270': - return TN3270Session(device, args.host, args.port, args.device_names, args.character_encoding, args.tn3270e_profile) + return TN3270Session(device, args.host, args.port, args.device_names, args.character_encoding, args.tn3270e_profile, args.ssl, args.no_starttls, args.ssl_no_verify, args.no_hostname_status) if args.emulator == 'vt100' and IS_VT100_AVAILABLE: host_command = [args.command, *args.command_args] @@ -88,6 +102,8 @@ def _create_session(args, device): def main(): args = parse_args(sys.argv[1:], IS_VT100_AVAILABLE) + logging.basicConfig(level=_LOG_LEVELS[args.log_level]) + def create_device(interface, device_address, poll_response): return _create_device(args, interface, device_address, poll_response) diff --git a/oec/args.py b/oec/args.py index 0b02183..868ac00 100644 --- a/oec/args.py +++ b/oec/args.py @@ -14,6 +14,17 @@ def parse_args(args, is_vt100_available): parser.add_argument('serial_port', help='serial port') + parser.add_argument('--log-level', choices=['debug', 'info', 'warning', 'error'], + default='info', dest='log_level', + help='logging level (default: info)') + + parser.add_argument('--keyboard-language', choices=['us', 'de'], default='us', + dest='keyboard_language', + help='keyboard national layout (default: us)') + + parser.add_argument('--clicker', action='store_true', default=False, + help='enable keyboard clicker') + subparsers = parser.add_subparsers(dest='emulator', required=True, description='emulator') @@ -28,6 +39,21 @@ def parse_args(args, is_vt100_available): dest='character_encoding', type=get_character_encoding, help='host EBCDIC code page') + tn3270_parser.add_argument('--ssl', action='store_true', default=False, + help='enable implicit SSL/TLS') + + tn3270_parser.add_argument('--no-starttls', action='store_true', default=False, + dest='no_starttls', + help='disable STARTTLS negotiation') + + tn3270_parser.add_argument('--ssl-no-verify', action='store_true', default=False, + dest='ssl_no_verify', + help='disable SSL/TLS certificate verification') + + tn3270_parser.add_argument('--no-hostname-status', action='store_true', default=False, + dest='no_hostname_status', + help='do not display host name on status line') + tn3270_parser.add_argument('--tn3270e', choices=['off', 'basic', 'default'], metavar='profile', default='default', dest='tn3270e_profile', diff --git a/oec/controller.py b/oec/controller.py index 551e75b..7c7a5d8 100644 --- a/oec/controller.py +++ b/oec/controller.py @@ -188,6 +188,13 @@ def _select_sessions(self, duration): if not self.session_selector.get_map(): return [] + # Check for sessions with SSL pending data first - select() won't + # report these as readable since the data is buffered in the SSL layer. + pending_sessions = [key.fileobj for key in self.session_selector.get_map().values() if key.fileobj.has_pending_data()] + + if pending_sessions: + return pending_sessions + selected = self.session_selector.select(duration) return [key.fileobj for (key, _) in selected] diff --git a/oec/display.py b/oec/display.py index aa6e49d..d1d5b2a 100644 --- a/oec/display.py +++ b/oec/display.py @@ -324,6 +324,14 @@ def _get_dirty_ranges(self): return [(self.dirty[0], self.dirty[-1])] CHAR_MAP = { + # GE/APL plane (0x40-0x7F) - multinational accented characters + 'ä': 0x50, + 'ö': 0x53, + 'ü': 0x54, + 'Ä': 0x70, + 'Ö': 0x73, + 'Ü': 0x74, + '>': 0x08, '<': 0x09, '[': 0x0a, diff --git a/oec/keyboard.py b/oec/keyboard.py index 11e2618..a3bd170 100644 --- a/oec/keyboard.py +++ b/oec/keyboard.py @@ -228,6 +228,17 @@ class Key(Enum): SLASH = ord('/') QUESTION = ord('?') + # German + ESZETT = ord('ß') + SECTION = ord('§') + CARET = ord('^') + LOWER_A_UMLAUT = ord('ä') + UPPER_A_UMLAUT = ord('Ä') + LOWER_O_UMLAUT = ord('ö') + UPPER_O_UMLAUT = ord('Ö') + LOWER_U_UMLAUT = ord('ü') + UPPER_U_UMLAUT = ord('Ü') + KEY_UPPER_MAP = { Key.LOWER_A: Key.UPPER_A, Key.LOWER_B: Key.UPPER_B, @@ -254,7 +265,10 @@ class Key(Enum): Key.LOWER_W: Key.UPPER_W, Key.LOWER_X: Key.UPPER_X, Key.LOWER_Y: Key.UPPER_Y, - Key.LOWER_Z: Key.UPPER_Z + Key.LOWER_Z: Key.UPPER_Z, + Key.LOWER_A_UMLAUT: Key.UPPER_A_UMLAUT, + Key.LOWER_O_UMLAUT: Key.UPPER_O_UMLAUT, + Key.LOWER_U_UMLAUT: Key.UPPER_U_UMLAUT } KEY_LOWER_MAP = {upper_key: lower_key for lower_key, upper_key in KEY_UPPER_MAP.items()} diff --git a/oec/keymap_3278_typewriter.py b/oec/keymap_3278_typewriter.py index 4b30298..9bcbd7e 100644 --- a/oec/keymap_3278_typewriter.py +++ b/oec/keymap_3278_typewriter.py @@ -93,7 +93,21 @@ 14: Key.UP, 19: Key.DOWN, 22: Key.LEFT, - 26: Key.RIGHT + 26: Key.RIGHT, + + # PF keys on APL keyboard + 64: Key.PF1, + 65: Key.PF2, + 66: Key.PF3, + 67: Key.PF4, + 68: Key.PF5, + 69: Key.PF6, + 70: Key.PF7, + 71: Key.PF8, + 72: Key.PF9, + 73: Key.PF10, + 74: Key.PF11, + 75: Key.PF12 } KEYMAP_SHIFT = { @@ -153,7 +167,21 @@ 108: Key.UPPER_M, 51: Key.COMMA, # TODO: Confirm this mapping 50: Key.CENTER_PERIOD, - 20: Key.QUESTION + 20: Key.QUESTION, + + # PF keys on APL keyboard + 64: Key.PF13, + 65: Key.PF14, + 66: Key.PF15, + 67: Key.PF16, + 68: Key.PF17, + 69: Key.PF18, + 70: Key.PF19, + 71: Key.PF20, + 72: Key.PF21, + 73: Key.PF22, + 74: Key.PF23, + 75: Key.PF24 } KEYMAP_ALT = { diff --git a/oec/keymap_3278_typewriter_de.py b/oec/keymap_3278_typewriter_de.py new file mode 100644 index 0000000..8b193d8 --- /dev/null +++ b/oec/keymap_3278_typewriter_de.py @@ -0,0 +1,66 @@ +""" +oec.keymap_3278_typewriter_de +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +""" + +from .keyboard import Key, Keymap +from .keymap_3278_typewriter import KEYMAP_DEFAULT as _US_DEFAULT, KEYMAP_SHIFT as _US_SHIFT, \ + KEYMAP_ALT, MODIFIER_RELEASE_MAP + +KEYMAP_DEFAULT = { + **_US_DEFAULT, + + # First Row + 48: Key.ESZETT, # was MINUS + 17: Key.SINGLE_QUOTE, # was EQUAL + + # Second Row + 120: Key.LOWER_Z, # was LOWER_Y + 27: Key.LOWER_U_UMLAUT, # was CENT + 21: Key.PLUS, # was BACKSLASH + + # Third Row + 126: Key.LOWER_O_UMLAUT, # was SEMICOLON + 18: Key.LOWER_A_UMLAUT, # was SINGLE_QUOTE + 15: Key.HASH, # was LEFT_BRACE + + # Fourth Row + 121: Key.LOWER_Y, # was LOWER_Z + 51: Key.COMMA, # unchanged + 50: Key.PERIOD, # unchanged + 20: Key.MINUS, # was SLASH +} + +KEYMAP_SHIFT = { + **_US_SHIFT, + + # First Row - number row + 33: Key.EXCLAMATION, # was BAR + 34: Key.DOUBLE_QUOTE, # was AT + 35: Key.SECTION, # was HASH + 38: Key.AMPERSAND, # was NOT + 39: Key.SLASH, # was AMPERSAND + 40: Key.LEFT_PAREN, # was ASTERISK + 41: Key.RIGHT_PAREN, # was LEFT_PAREN + 32: Key.EQUAL, # was RIGHT_PAREN + 48: Key.QUESTION, # was UNDERSCORE + 17: Key.BACKTICK, # was PLUS + + # Second Row + 120: Key.UPPER_Z, # was UPPER_Y + 27: Key.UPPER_U_UMLAUT, # was EXCLAMATION + 21: Key.ASTERISK, # was BROKEN_BAR + + # Third Row + 126: Key.UPPER_O_UMLAUT, # was COLON + 18: Key.UPPER_A_UMLAUT, # was DOUBLE_QUOTE + 15: Key.CARET, # was RIGHT_BRACE + + # Fourth Row + 121: Key.UPPER_Y, # was UPPER_Z + 51: Key.SEMICOLON, # was COMMA + 50: Key.COLON, # was CENTER_PERIOD + 20: Key.UNDERSCORE, # was QUESTION +} + +KEYMAP = Keymap('3278 Typewriter (DE)', KEYMAP_DEFAULT, KEYMAP_SHIFT, KEYMAP_ALT, MODIFIER_RELEASE_MAP) diff --git a/oec/session.py b/oec/session.py index a876f32..35e0b19 100644 --- a/oec/session.py +++ b/oec/session.py @@ -11,6 +11,9 @@ def terminate(self): def fileno(self): raise NotImplementedError + def has_pending_data(self): + return False + def handle_host(self): raise NotImplementedError diff --git a/oec/terminal.py b/oec/terminal.py index 68215cb..8840c44 100644 --- a/oec/terminal.py +++ b/oec/terminal.py @@ -3,10 +3,14 @@ ~~~~~~~~~~~~ """ +import logging + from coax import LoadControlRegister, Feature, PollAction, Control from .device import Device, UnsupportedDeviceError from .display import Dimensions, BufferedDisplay + +logger = logging.getLogger(__name__) from .keyboard import Keyboard MODEL_DIMENSIONS = { @@ -50,8 +54,9 @@ def setup(self): self.display.clear(clear_status_line=True) - # Show the attached indicator on the status line. - self.display.status_line.write_string(0, 'OEC') + # Show the attached indicator on the status line, padded to full width. + columns = self.display.status_line.columns + self.display.status_line.write_string(0, 'OEC'.ljust(columns)) self.display.move_cursor(row=0, column=0) @@ -62,6 +67,7 @@ def get_poll_action(self): # Convert a queued alarm or keyboard clicker change to POLL action. if self.alarm: poll_action = PollAction.ALARM + logger.debug('Alarm delivered via POLL') self.alarm = False elif self.keyboard.clicker != self.last_poll_keyboard_clicker: @@ -76,6 +82,7 @@ def get_poll_action(self): def sound_alarm(self): """Queue an alarm on next POLL command.""" + logger.debug('Alarm queued') self.alarm = True def load_control_register(self): diff --git a/oec/tn3270.py b/oec/tn3270.py index 6827d80..fcb8318 100644 --- a/oec/tn3270.py +++ b/oec/tn3270.py @@ -4,6 +4,7 @@ """ import logging +import ssl from tn3270 import Telnet, TN3270EFunction, Emulator, AttributeCell, CharacterCell, AID, Color, \ Highlight, OperatorError, ProtectedCellOperatorError, FieldOverflowOperatorError from tn3270.ebcdic import DUP, FM @@ -47,7 +48,7 @@ class TN3270Session(Session): """TN3270 session.""" - def __init__(self, terminal, host, port, device_names, character_encoding, tn3270e_profile): + def __init__(self, terminal, host, port, device_names, character_encoding, tn3270e_profile, ssl_enabled=False, no_starttls=False, ssl_no_verify=False, no_hostname_status=False): super().__init__(terminal) self.logger = logging.getLogger(__name__) @@ -57,6 +58,10 @@ def __init__(self, terminal, host, port, device_names, character_encoding, tn327 self.device_names = device_names self.character_encoding = character_encoding self.tn3270e_profile = tn3270e_profile + self.ssl_enabled = ssl_enabled + self.no_starttls = no_starttls + self.ssl_no_verify = ssl_no_verify + self.no_hostname_status = no_hostname_status self.telnet = None self.emulator = None @@ -72,6 +77,11 @@ def __init__(self, terminal, host, port, device_names, character_encoding, tn327 def start(self): self._connect_host() + self._write_security_status() + + if not self.no_hostname_status: + self._write_hostname_status() + (rows, columns) = self.terminal.display.dimensions if self.terminal.display.has_eab: @@ -94,6 +104,10 @@ def terminate(self): def fileno(self): return self.emulator.stream.socket.fileno() + def has_pending_data(self): + sock = self.emulator.stream.socket + return isinstance(sock, ssl.SSLSocket) and sock.pending() > 0 + def handle_host(self): try: if not self.emulator.update(timeout=0): @@ -196,13 +210,49 @@ def _connect_host(self): self.telnet = Telnet(terminal_type, **tn3270e_args) - self.telnet.open(self.host, self.port, self.device_names) + ssl_args = {} + starttls_enabled = not self.ssl_enabled and not self.no_starttls + + if self.ssl_enabled or starttls_enabled: + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + + if self.ssl_no_verify: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + else: + ssl_context.load_default_certs() + + if self.ssl_enabled: + ssl_args['ssl_context'] = ssl_context + ssl_args['ssl_server_hostname'] = self.host + else: + ssl_args['starttls_ssl_context'] = ssl_context + ssl_args['starttls_server_hostname'] = self.host + + self.telnet.open(self.host, self.port, self.device_names, **ssl_args) if self.telnet.is_tn3270e_negotiated: self.logger.info(f'TN3270E mode negotiated: Device Type = {self.telnet.device_type}, Device Name = {self.telnet.device_name}, Functions = {self.telnet.tn3270e_functions}') else: self.logger.debug('Unable to negotiate TN3270E mode') + def _write_security_status(self): + is_secure = isinstance(self.telnet.socket, ssl.SSLSocket) + + self.terminal.display.status_line.write_string(17, 'encrypted' if is_secure else 'unencrypted') + + def _write_hostname_status(self): + status_column = 46 + columns = self.terminal.display.status_line.columns + max_length = columns - status_column + + if self.port == 23: + text = self.host + else: + text = f'{self.host}:{self.port}' + + self.terminal.display.status_line.write_string(status_column, text[:max_length].rjust(max_length)) + def _disconnect_host(self): self.telnet.close() diff --git a/requirements.txt b/requirements.txt index 6c98492..ac23bf3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ ptyprocess==0.7.0 pycoax==0.11.2 pyserial==3.5 pyte==0.8.1 -pytn3270==0.16.0 +pytn3270 @ git+https://github.com/hanshuebner/pytn3270.git@master sliplib==0.6.2 sortedcontainers==2.4.0 telnetlib3==2.0.4 diff --git a/setup-udev.sh b/setup-udev.sh new file mode 100755 index 0000000..9b76fc4 --- /dev/null +++ b/setup-udev.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -euo pipefail + +RULE='SUBSYSTEM=="tty", ATTRS{serial}=="e6613852831c9e32", SYMLINK+="ttyCoax"' +RULE_FILE="/etc/udev/rules.d/99-coax-interface.rules" + +echo "$RULE" | sudo tee "$RULE_FILE" > /dev/null +sudo udevadm control --reload-rules +sudo udevadm trigger --subsystem-match=tty + +echo "Rule installed at $RULE_FILE" +echo "Symlink /dev/ttyCoax should now be active:" +ls -l /dev/ttyCoax diff --git a/tests/test_args.py b/tests/test_args.py index 0e5cdf9..095e5a5 100644 --- a/tests/test_args.py +++ b/tests/test_args.py @@ -85,3 +85,28 @@ def test_tn3270_invalid_deprecated_port(self): self.parser_error.assert_called_once() self.assertEqual(self.parser_error.call_args.args[0], f'argument port: invalid port: {port}') + + def test_tn3270_ssl_default(self): + args = parse_args(['/dev/ttyACM0', 'tn3270', 'host'], False) + + self.assertFalse(args.ssl) + self.assertFalse(args.no_starttls) + self.assertFalse(args.ssl_no_verify) + + def test_tn3270_ssl(self): + args = parse_args(['/dev/ttyACM0', 'tn3270', '--ssl', 'host:992'], False) + + self.assertTrue(args.ssl) + self.assertFalse(args.ssl_no_verify) + + def test_tn3270_ssl_no_verify(self): + args = parse_args(['/dev/ttyACM0', 'tn3270', '--ssl', '--ssl-no-verify', 'host:992'], False) + + self.assertTrue(args.ssl) + self.assertTrue(args.ssl_no_verify) + + def test_tn3270_no_starttls(self): + args = parse_args(['/dev/ttyACM0', 'tn3270', '--no-starttls', 'host:23'], False) + + self.assertFalse(args.ssl) + self.assertTrue(args.no_starttls) diff --git a/tests/test_terminal.py b/tests/test_terminal.py index c517a2f..cecabcc 100644 --- a/tests/test_terminal.py +++ b/tests/test_terminal.py @@ -39,6 +39,7 @@ def setUp(self): self.terminal.display = create_autospec(Display, instance=True) self.terminal.display.status_line = create_autospec(StatusLine, instance=True) + self.terminal.display.status_line.columns = 80 def test(self): self.terminal.setup()