vt102 module
vt102 is an in memory vt1xx terminal emulator. It supports all the most common terminal escape sequences, including text attributes and color.
It's an in memory vt1XX-compatible terminal emulator. The XX stands for a series video terminals, developed by DEC between 1970 and 1995. The first, and most famous one, was VT100 terminal, which is now a de-facto standard for all virtual terminal emulators. vt102 is one such emulator.
Why would you want to use a terminal emulator?
- Screen scraping some terminal or curses app.
- Writing your own graphical terminal emulator.
- ... seriously, that's about it.
Here's a quick example:
>>> from vt102 import screen, stream >>> st = stream() >>> sc = screen((10, 10)) >>> print(sc) [" ", " ", " ", " ", " ", " ", " ", " ", " ", " "] >>> sc.attach(st) >>> st.process("Text goes here") >>> print(sc) ["Text goes ", "here ", " ", " ", " ", " ", " ", " ", " ", " "] >>> st.process("\x1b[H\x1b[K") >>> print(sc) [" ", "here ", " ", " ", " ", " ", " ", " ", " ", " "]
Here's a real-world example:
>>> import vt102 >>> stream = vt102.stream() >>> screen = vt102.screen((24,80)) >>> screen.attach(stream) >>> stream.process(u"\u001b7\u001b[?47h\u001b)0\u001b[H\u001b[2J\u001b[H" + ... u"\u001b[2;1HNetHack, Copyright 1985-2003\r\u001b[3;1" + ... u"H By Stichting Mathematisch Centrum and M. " + ... u"Stephenson.\r\u001b[4;1H See license for de" + ... u"tails.\r\u001b[5;1H\u001b[6;1H\u001b[7;1HShall I pi" + ... u"ck a character's race, role, gender and alignment f" + ... u"or you? [ynq] ") >>> print(screen) [" ", "NetHack, Copyright 1985-2003 ", " By Stichting Mathematisch Centrum and M. Stephenson. ", " See license for details. ", " ", " ", "Shall I pick a character's race, role, gender and alignment for you? [ynq] ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " "]
""" [vt102](http://github.com/samfoo/vt102) is an in memory vt1xx terminal emulator. It supports all the most common terminal escape sequences, including text attributes and color. It's an in memory vt1XX-compatible terminal emulator. The *XX* stands for a series video terminals, developed by [DEC](http://en.wikipedia.org/wiki/Digital_Equipment_Corporation) between 1970 and 1995. The first, and most famous one, was VT100 terminal, which is now a de-facto standard for all virtual terminal emulators. [vt102](http://github.com/samfoo/vt102) is one such emulator. Why would you want to use a terminal emulator? * Screen scraping some terminal or curses app. * Writing your own graphical terminal emulator. * ... seriously, that's about it. Here's a quick example: >>> from vt102 import screen, stream >>> st = stream() >>> sc = screen((10, 10)) >>> print(sc) [" ", " ", " ", " ", " ", " ", " ", " ", " ", " "] >>> sc.attach(st) >>> st.process("Text goes here") >>> print(sc) ["Text goes ", "here ", " ", " ", " ", " ", " ", " ", " ", " "] >>> st.process("\\x1b[H\\x1b[K") >>> print(sc) [" ", "here ", " ", " ", " ", " ", " ", " ", " ", " "] Here's a real-world example: >>> import vt102 >>> stream = vt102.stream() >>> screen = vt102.screen((24,80)) >>> screen.attach(stream) >>> stream.process(u"\\u001b7\\u001b[?47h\\u001b)0\\u001b[H\\u001b[2J\\u001b[H" + ... u"\\u001b[2;1HNetHack, Copyright 1985-2003\\r\\u001b[3;1" + ... u"H By Stichting Mathematisch Centrum and M. " + ... u"Stephenson.\\r\\u001b[4;1H See license for de" + ... u"tails.\\r\\u001b[5;1H\\u001b[6;1H\\u001b[7;1HShall I pi" + ... u"ck a character's race, role, gender and alignment f" + ... u"or you? [ynq] ") >>> print(screen) [" ", "NetHack, Copyright 1985-2003 ", " By Stichting Mathematisch Centrum and M. Stephenson. ", " See license for details. ", " ", " ", "Shall I pick a character's race, role, gender and alignment for you? [ynq] ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " ", " "] >>> """ import string import codecs from copy import copy from .graphics import text, colors from . import control as ctrl, escape as esc # from .control import * # from .escape import * class StreamProcessError(Exception): pass class stream: """ A stream is the state machine that parses a stream of terminal characters and dispatches events based on what it sees. This can be attached to a screen object and it's events, or can be used some other way. `stream.basic`, `stream.escape`, and `stream.sequence` are the relevant events that get thrown with one addition: `print`. For details on the event parameters, see the [vt102 user's guide](http://vt100.net/docs/vt102-ug/) Quick example: >>> s = stream() >>> class Cursor: ... def __init__(self): ... self.x = 10; self.y = 10 ... def up(self, count): ... self.y -= count ... >>> c = Cursor() >>> s.add_event_listener("cursor-up", c.up) >>> s.process(u"\\x00\\x1b[5A") # Move the cursor up 5 rows. >>> print(c.y) 5 """ basic = { ctrl.BS: "backspace", ctrl.HT: "tab", ctrl.LF: "linefeed", ctrl.VT: "linefeed", ctrl.FF: "linefeed", ctrl.CR: "carriage-return", ctrl.SI: "shift-in", ctrl.SO: "shift-out", ctrl.BEL: "bell" } escape = { esc.IND: "index", esc.RI: "reverse-index", esc.NEL: "linefeed", esc.DECSC: "store-cursor", esc.DECRC: "restore-cursor", esc.RLF: "reverse-linefeed", } sequence = { esc.CUU: "cursor-up", esc.CUD: "cursor-down", esc.CUF: "cursor-right", esc.CUB: "cursor-left", esc.CUP: "cursor-move", esc.HVP: "cursor-move", esc.EL: "erase-in-line", esc.ED: "erase-in-display", esc.DCH: "delete-characters", esc.IL: "insert-lines", esc.DL: "delete-lines", esc.SGR: "select-graphic-rendition", esc.DECSTBM: "set-margins", esc.IRMI: "set-insert", esc.IRMR: "set-replace", } def __init__(self, fail_on_unknown_esc=True): self.state = "stream" self.params = [] self.current_param = "" self.listeners = {} self.fail_on_unknown_esc = fail_on_unknown_esc def _escape_sequence(self, char): """ Handle characters seen when in an escape sequence. Most non-vt52 commands start with a left-bracket after the escape and then a stream of parameters and a command. """ num = ord(char) if char == "[": self.state = "escape-lb" elif char == "(": self.state = "charset-g0" elif char == ")": self.state = "charset-g1" elif num in self.escape: self.dispatch(self.escape[num]) self.state = "stream" elif self.fail_on_unknown_esc: raise StreamProcessError("Unexpected character '%c' == '0x%02x'" % (char, ord(char))) def _end_escape_sequence(self, char): """ Handle the end of an escape sequence. The final character in an escape sequence is the command to execute, which corresponds to the event that is dispatched here. """ num = ord(char) if num in self.sequence: self.dispatch(self.sequence[num], *self.params) self.state = "stream" self.current_param = "" self.params = [] def _escape_parameters(self, char): """ Parse parameters in an escape sequence. Parameters are a list of numbers in ascii (e.g. '12', '4', '42', etc) separated by a semicolon (e.g. "12;4;42"). See the [vt102 user guide](http://vt100.net/docs/vt102-ug/) for more details on the formatting of escape parameters. """ if char == ";": self.params.append(int(self.current_param)) self.current_param = "" elif char == "?": self.state = "mode" elif not char.isdigit(): if len(self.current_param) > 0: self.params.append(int(self.current_param)) # If we're in parameter parsing mode, but we see a non-numeric # value, it must be the end of the control sequence. self._end_escape_sequence(char) else: self.current_param += char def _mode(self, char): if char in "lh": # 'l' or 'h' designates the end of a mode stream. We don't # really care about mode streams so anything else seen while # in the mode state, is just ignored. self.state = "stream" def _charset_g0(self, char): self.dispatch("charset-g0", char) self.state = "stream" def _charset_g1(self, char): self.dispatch("charset-g1", char) self.state = "stream" def _stream(self, char): """ Process a character when in the default 'stream' state. """ num = ord(char) if num in self.basic: self.dispatch(self.basic[num]) elif num == ctrl.ESC: self.state = "escape" elif num == 0x00: # nulls are just ignored. pass else: self.dispatch("print", char) def consume(self, char): """ Consume a single character and advance the state as necessary. """ if self.state == "stream": self._stream(char) elif self.state == "escape": self._escape_sequence(char) elif self.state == "escape-lb": self._escape_parameters(char) elif self.state == "mode": self._mode(char) elif self.state == "charset-g0": self._charset_g0(char) elif self.state == "charset-g1": self._charset_g1(char) def process(self, chars): """ Consume a string of and advance the state as necessary. """ while len(chars) > 0: self.consume(chars[0]) chars = chars[1:] def add_event_listener(self, event, function): """ Add an event listen for a particular event. Depending on the event there may or may not be parameters passed to function. Most escape streams also allow for an empty set of parameters (with a default value). Providing these default values and accepting variable arguments is the responsibility of function. More than one listener may be added for a single event. Each listener will be called. * **event** The event to listen for. * **function** The callable to invoke. """ if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(function) def dispatch(self, event, *args): """ Dispatch an event where `args` is a tuple of the arguments to send to any callbacks. If any callback throws an exception, the subsequent callbacks will be aborted. """ for callback in self.listeners.get(event, []): if len(args) > 0: callback(*args) else: callback() class screen: """ A screen is an in memory buffer of strings that represents the screen display of the terminal. It can be instantiated on it's own and given explicit commands, or it can be attached to a stream and will respond to events. The screen buffer can be accessed through the screen's `display` property. """ #: Default colors and styling. The value of this attribute should #: always be immutable, since shallow copies are made when resizing / #: applying / deleting / printing. #: #: Attributes are represented by a three-tuple that consists of the #: following: #: #: 1. A tuple of all the text attributes: `bold`, `underline`, etc #: 2. The foreground color as a string, see #: :attr:`vt102.graphics.colors` #: 3. The background color as a string, see #: :attr:`vt102.graphics.colors` default_attributes = (), "default", "default" def __init__(self, shape, encoding="utf-8"): rows, cols = shape self.encoding = encoding self.decoder = codecs.getdecoder(encoding) self.size = (rows, cols) self.x = 0 self.y = 0 self.irm = "insert" self.tabstops = [] self.g0 = None self.g1 = None self.current_charset = "g0" self.cursor_save_stack = [] # Initialize the screen to completely empty. self.display = [u" " * cols] * rows # Initialize the attributes to completely empty, but the same size as # the screen. self.attributes = [[self.default_attributes] * cols] * rows self.cursor_attributes = self.default_attributes def __repr__(self): return repr(self.display) def __str__(self): lines = ['"%s"' % l for l in self.display] return "[" + ",\n ".join(lines) + "]" def attach(self, events): """ Attach this screen to a events that processes commands and dispatches events. Sets up the appropriate event handlers so that the screen will update itself automatically as the events processes data. """ if events is not None: events.add_event_listener("print", self._print) events.add_event_listener("backspace", self._backspace) events.add_event_listener("tab", self._tab) events.add_event_listener("linefeed", self._linefeed) events.add_event_listener("reverse-linefeed", self._reverse_linefeed) events.add_event_listener("carriage-return", self._carriage_return) events.add_event_listener("index", self._index) events.add_event_listener("reverse-index", self._reverse_index) events.add_event_listener("store-cursor", self._save_cursor) events.add_event_listener("restore-cursor", self._restore_cursor) events.add_event_listener("cursor-up", self._cursor_up) events.add_event_listener("cursor-down", self._cursor_down) events.add_event_listener("cursor-right", self._cursor_forward) events.add_event_listener("cursor-left", self._cursor_back) events.add_event_listener("cursor-move", self._cursor_position) events.add_event_listener("erase-in-line", self._erase_in_line) events.add_event_listener("erase-in-display", self._erase_in_display) events.add_event_listener("delete-characters", self._delete_character) events.add_event_listener("insert-lines", self._insert_line) events.add_event_listener("delete-lines", self._delete_line) events.add_event_listener("select-graphic-rendition", self._select_graphic_rendition) events.add_event_listener("charset-g0", self._charset_g0) events.add_event_listener("charset-g1", self._charset_g1) events.add_event_listener("shift-in", self._shift_in) events.add_event_listener("shift-out", self._shift_out) events.add_event_listener("bell", self._bell) def cursor(self): """ The current location of the cursor. """ return (self.x, self.y) def resize(self, shape): """ Resize the screen. If the requested screen size has more rows than the existing screen, rows will be added at the bottom. If the requested size has less rows than the existing screen rows will be clipped at the top of the screen. Similarly if the existing screen has less columns than the requested size, columns will be added at the right, and it it has more, columns will be clipped at the right. """ rows, cols = shape # Honestly though, you can't trust anyone these days... assert(rows > 0 and cols > 0) # First resize the rows if self.size[0] < rows: # If the current display size is shorter than the requested screen # size, then add rows to the bottom. Note that the old column size # is used here so these new rows will get expanded/contracted as # necessary by the column resize when it happens next. self.display += [u" " * self.size[1]] * (rows - self.size[0]) self.attributes += [[self.default_attributes] * self.size[1]] * \ (rows - self.size[0]) elif self.size[0] > rows: # If the current display size is taller than the requested display, # then take rows off the top. self.display = self.display[self.size[0]-rows:] self.attributes = self.attributes[self.size[0]-rows:] # Next, of course, resize the columns. if self.size[1] < cols: # If the current display size is thinner than the requested size, # expand each row to be the new size. self.display = \ [row + (u" " * (cols - self.size[1])) for row in self.display] self.attributes = \ [row + ([self.default_attributes] * (cols - self.size[1])) for row in self.attributes] elif self.size[1] > cols: # If the current display size is fatter than the requested size, # then trim each row from the right to be the new size. self.display = [row[:cols-self.size[1]] for row in self.display] self.attributes = [row[:cols-self.size[1]] for row in self.attributes] self.size = (rows, cols) return self.size def _shift_in(self): self.current_charset = "g0" def _shift_out(self): self.current_charset = "g1" def _bell(self): pass def _charset_g0(self, cs): if cs == '0': self.g0 = graphics.dsg else: # TODO: Officially support UK/US/intl8 charsets self.g0 = None def _charset_g1(self, cs): if cs == '0': self.g1 = graphics.dsg else: # TODO: Officially support UK/US/intl8 charsets self.g1 = None def _print(self, char): """ Print a character at the current cursor position and advance the cursor. """ # Don't make bugs where we try to print a screen. assert len(char) == 1 try: try: # Python 3 char = self.decoder(bytes(char, self.encoding))[0] except TypeError: # Python 2.x char = self.decoder(char)[0] except UnicodeDecodeError: char = "?" if self.current_charset == "g0" and self.g0 is not None: char = char.translate(self.g0) elif self.current_charset == "g1" and self.g1 is not None: char = char.translate(self.g1) row = self.display[self.y] self.display[self.y] = row[:self.x] + char + row[self.x+1:] attrs = self.attributes[self.y] self.attributes[self.y] = attrs[:self.x] + [self.cursor_attributes] + \ attrs[self.x+1:] self.x += 1 if self.x >= self.size[1]: # If this was the last column in a row, move the cursor to the # next row. self._linefeed() def _carriage_return(self): """ Move the cursor to the beginning of the current row. """ self.x = 0 def _index(self): """ Move the cursor down one row in the same column. If the cursor is at the last row, create a new row at the bottom. """ if self.y + 1 >= self.size[0]: # If the cursor is currently on the last row, then spawn another # and scroll down (removing the top row). self.display = self.display[1:] + [u" " * self.size[1]] else: # If the cursor is anywhere else, then just move it to the # next line. self.y += 1 def _reverse_index(self): """ Move the cursor up one row in the same column. If the cursor is at the first row, create a new row at the top. """ if self.y == 0: # If the cursor is currently at the first row, then scroll the # screen up. self.display = [u" " * self.size[1]] + self.display[:-1] else: # If the cursor is anywhere other than the first row than just move # it up by one row. self.y -= 1 def _linefeed(self): """ Performs an index and then a carriage return. """ self._index() self.x = 0 def _reverse_linefeed(self): """ Performs a reverse index and then a carriage return. """ self._reverse_index() self.x = 0 def _next_tab_stop(self): """ Return the x value of the next available tabstop or the x value of the margin if there are no more tabstops. """ for stop in sorted(self.tabstops): if self.x < stop: return stop return self.size[1] - 1 def _tab(self): """ Move to the next tab space, or the end of the screen if there aren't anymore left. """ self.x = self._next_tab_stop() def _backspace(self): """ Move cursor to the left one or keep it in it's position if it's at the beginning of the line already. """ self.x = max(0, self.x-1) def _save_cursor(self): """ Push the current cursor position onto the stack. """ self.cursor_save_stack.append((self.x, self.y)) def _restore_cursor(self): """ Set the current cursor position to whatever cursor is on top of the stack. """ if len(self.cursor_save_stack): self.x, self.y = self.cursor_save_stack.pop() def _insert_line(self, count=1): """ Inserts lines at line with cursor. Lines displayed below cursor move down. Lines moved past the bottom margin are lost. """ trimmed = self.display[:self.y+1] + \ [u" " * self.size[1]] * count + \ self.display[self.y+1:self.y+count+1] self.display = trimmed[:self.size[0]] def _delete_line(self, count=1): """ Deletes count lines, starting at line with cursor. As lines are deleted, lines displayed below cursor move up. Lines added to bottom of screen have spaces with same character attributes as last line moved up. """ self.display = self.display[:self.y] + \ self.display[self.y+1:] self.display.append([u" " * self.size[1]] * count) self.attributes = self.attributes[:self.y] + \ self.attributes[self.y+1:] last_attributes = self.attributes[-1] for _ in xrange(count): self.attributes.append(copy(last_attributes)) def _delete_character(self, count=1): """ Deletes count characters, starting with the character at cursor position. When a character is deleted, all characters to the right of cursor move left. """ # First resize the text display row = self.display[self.y] count = min(count, self.size[1] - self.x) row = row[:self.x] + row[self.x+count:] + u" " * count self.display[self.y] = row # Then resize the attribute array too attrs = self.attributes[self.y] attrs = attrs[:self.x] + attrs[self.x+count:] + [self.default_attributes] * count self.attributes[self.y] = attrs def _erase_in_line(self, type_of=0): """ Erases the row in a specific way, depending on the type_of. """ row = self.display[self.y] attrs = self.attributes[self.y] if type_of == 0: # Erase from the cursor to the end of line, including the cursor row = row[:self.x] + u" " * (self.size[1] - self.x) attrs = attrs[:self.x] + [self.default_attributes] * (self.size[1] - self.x) elif type_of == 1: # Erase from the beginning of the line to the cursor, including it row = u" " * (self.x+1) + row[self.x+1:] attrs = [self.default_attributes] * (self.x+1) + attrs[self.x+1:] elif type_of == 2: # Erase the entire line. row = u" " * self.size[1] attrs = [self.default_attributes] * self.size[1] self.display[self.y] = row self.attributes[self.y] = attrs def _erase_in_display(self, type_of=0): if type_of == 0: # Erase from cursor to the end of the display, including the # cursor. self.display = self.display[:self.y] + \ [u" " * self.size[1]] * (self.size[0] - self.y) self.attributes = self.attributes[:self.y] + \ [[self.default_attributes] * self.size[1]] * (self.size[0] - self.y) elif type_of == 1: # Erase from the beginning of the display to the cursor, including # it. self.display = [u" " * self.size[1]] * (self.y + 1) + \ self.display[self.y+1:] self.attributes = [[self.default_attributes] * self.size[1]] * (self.y + 1) + \ self.attributes[self.y+1:] elif type_of == 2: # Erase the whole display. self.display = [u" " * self.size[1]] * self.size[0] self.attributes = [[self.default_attributes] * self.size[1]] * self.size[0] def _set_insert_mode(self): self.irm = "insert" def _set_replace_mode(self): self.irm = "replace" def _set_tab_stop(self): """ Sets a horizontal tab stop at cursor position. """ self.tabstops.append(self.x) def _clear_tab_stop(self, type_of=0x33): if type_of == 0x30: # Clears a horizontal tab stop at cursor position. try: self.tabstops.remove(self.x) except ValueError: # If there is no tabstop at the current position, then just do # nothing. pass elif type_of == 0x33: # Clears all horizontal tab stops self.tabstops = [] def _cursor_up(self, count=1): """ Moves cursor up count lines in same column. Cursor stops at top margin. """ self.y = max(0, self.y - count) def _cursor_down(self, count=1): """ Moves cursor down count lines in same column. Cursor stops at bottom margin. """ self.y = min(self.size[0] - 1, self.y + count) def _cursor_back(self, count=1): """ Moves cursor left count columns. Cursor stops at left margin. """ self.x = max(0, self.x - count) def _cursor_forward(self, count=1): """ Moves cursor right count columns. Cursor stops at right margin. """ self.x = min(self.size[1] - 1, self.x + count) def _cursor_position(self, row=0, column=0): """ Set the cursor to a specific row and column. Obnoxiously row/column is 1 based, instead of zero based, so we need to compensate. I know I've created bugs in here somehow. Confoundingly, inputs of 0 are still acceptable, and should move to the beginning of the row/column as if they were 1. *sigh* """ if row == 0: row = 1 if column == 0: column = 1 self.y = min(row - 1, self.size[0] - 1) self.x = min(column - 1, self.size[1] - 1) def _home(self): """ Set the cursor to (0, 0) """ self.y = self.x = 0 def _remove_text_attr(self, attr): current = set(self.cursor_attributes[0]) if attr in current: current.remove(attr) return tuple(current) + self.cursor_attributes[1:] def _add_text_attr(self, attr): current = set(self.cursor_attributes[0]) current.add(attr) attrs = self.cursor_attributes[1:] return (tuple(current), attrs[0], attrs[1]) def _text_attr(self, attr): """ Given a text attribute, set the current cursor appropriately. """ attr = text[attr] if attr == "reset": self.cursor_attributes = self.default_attributes elif attr == "underline-off": self.cursor_attributes = self._remove_text_attr("underline") elif attr == "blink-off": self.cursor_attributes = self._remove_text_attr("blink") elif attr == "reverse-off": self.cursor_attributes = self._remove_text_attr("reverse") else: self.cursor_attributes = self._add_text_attr(attr) def _color_attr(self, ground, attr): """ Given a color attribute, set the current cursor appropriately. """ attr = colors[ground][attr] attrs = self.cursor_attributes if ground == "foreground": self.cursor_attributes = (attrs[0], attr, attrs[2]) elif ground == "background": self.cursor_attributes = (attrs[0], attrs[1], attr) def _set_attr(self, attr): """ Given some text attribute, set the current cursor attributes appropriately. """ if attr in text: self._text_attr(attr) elif attr in colors["foreground"]: self._color_attr("foreground", attr) elif attr in colors["background"]: self._color_attr("background", attr) def _select_graphic_rendition(self, *attrs): """ Set the current text attribute. """ if len(attrs) == 0: # No arguments means that we're really trying to do a reset. attrs = [0] for attr in attrs: self._set_attr(attr)
Module variables
var colors
var text
Classes
class StreamProcessError
class StreamProcessError(Exception): pass
Ancestors (in MRO)
- StreamProcessError
- builtins.Exception
- builtins.BaseException
- builtins.object
Class variables
var args
class screen
A screen is an in memory buffer of strings that represents the screen display of the terminal. It can be instantiated on it's own and given explicit commands, or it can be attached to a stream and will respond to events.
The screen buffer can be accessed through the screen's display
property.
class screen: """ A screen is an in memory buffer of strings that represents the screen display of the terminal. It can be instantiated on it's own and given explicit commands, or it can be attached to a stream and will respond to events. The screen buffer can be accessed through the screen's `display` property. """ #: Default colors and styling. The value of this attribute should #: always be immutable, since shallow copies are made when resizing / #: applying / deleting / printing. #: #: Attributes are represented by a three-tuple that consists of the #: following: #: #: 1. A tuple of all the text attributes: `bold`, `underline`, etc #: 2. The foreground color as a string, see #: :attr:`vt102.graphics.colors` #: 3. The background color as a string, see #: :attr:`vt102.graphics.colors` default_attributes = (), "default", "default" def __init__(self, shape, encoding="utf-8"): rows, cols = shape self.encoding = encoding self.decoder = codecs.getdecoder(encoding) self.size = (rows, cols) self.x = 0 self.y = 0 self.irm = "insert" self.tabstops = [] self.g0 = None self.g1 = None self.current_charset = "g0" self.cursor_save_stack = [] # Initialize the screen to completely empty. self.display = [u" " * cols] * rows # Initialize the attributes to completely empty, but the same size as # the screen. self.attributes = [[self.default_attributes] * cols] * rows self.cursor_attributes = self.default_attributes def __repr__(self): return repr(self.display) def __str__(self): lines = ['"%s"' % l for l in self.display] return "[" + ",\n ".join(lines) + "]" def attach(self, events): """ Attach this screen to a events that processes commands and dispatches events. Sets up the appropriate event handlers so that the screen will update itself automatically as the events processes data. """ if events is not None: events.add_event_listener("print", self._print) events.add_event_listener("backspace", self._backspace) events.add_event_listener("tab", self._tab) events.add_event_listener("linefeed", self._linefeed) events.add_event_listener("reverse-linefeed", self._reverse_linefeed) events.add_event_listener("carriage-return", self._carriage_return) events.add_event_listener("index", self._index) events.add_event_listener("reverse-index", self._reverse_index) events.add_event_listener("store-cursor", self._save_cursor) events.add_event_listener("restore-cursor", self._restore_cursor) events.add_event_listener("cursor-up", self._cursor_up) events.add_event_listener("cursor-down", self._cursor_down) events.add_event_listener("cursor-right", self._cursor_forward) events.add_event_listener("cursor-left", self._cursor_back) events.add_event_listener("cursor-move", self._cursor_position) events.add_event_listener("erase-in-line", self._erase_in_line) events.add_event_listener("erase-in-display", self._erase_in_display) events.add_event_listener("delete-characters", self._delete_character) events.add_event_listener("insert-lines", self._insert_line) events.add_event_listener("delete-lines", self._delete_line) events.add_event_listener("select-graphic-rendition", self._select_graphic_rendition) events.add_event_listener("charset-g0", self._charset_g0) events.add_event_listener("charset-g1", self._charset_g1) events.add_event_listener("shift-in", self._shift_in) events.add_event_listener("shift-out", self._shift_out) events.add_event_listener("bell", self._bell) def cursor(self): """ The current location of the cursor. """ return (self.x, self.y) def resize(self, shape): """ Resize the screen. If the requested screen size has more rows than the existing screen, rows will be added at the bottom. If the requested size has less rows than the existing screen rows will be clipped at the top of the screen. Similarly if the existing screen has less columns than the requested size, columns will be added at the right, and it it has more, columns will be clipped at the right. """ rows, cols = shape # Honestly though, you can't trust anyone these days... assert(rows > 0 and cols > 0) # First resize the rows if self.size[0] < rows: # If the current display size is shorter than the requested screen # size, then add rows to the bottom. Note that the old column size # is used here so these new rows will get expanded/contracted as # necessary by the column resize when it happens next. self.display += [u" " * self.size[1]] * (rows - self.size[0]) self.attributes += [[self.default_attributes] * self.size[1]] * \ (rows - self.size[0]) elif self.size[0] > rows: # If the current display size is taller than the requested display, # then take rows off the top. self.display = self.display[self.size[0]-rows:] self.attributes = self.attributes[self.size[0]-rows:] # Next, of course, resize the columns. if self.size[1] < cols: # If the current display size is thinner than the requested size, # expand each row to be the new size. self.display = \ [row + (u" " * (cols - self.size[1])) for row in self.display] self.attributes = \ [row + ([self.default_attributes] * (cols - self.size[1])) for row in self.attributes] elif self.size[1] > cols: # If the current display size is fatter than the requested size, # then trim each row from the right to be the new size. self.display = [row[:cols-self.size[1]] for row in self.display] self.attributes = [row[:cols-self.size[1]] for row in self.attributes] self.size = (rows, cols) return self.size def _shift_in(self): self.current_charset = "g0" def _shift_out(self): self.current_charset = "g1" def _bell(self): pass def _charset_g0(self, cs): if cs == '0': self.g0 = graphics.dsg else: # TODO: Officially support UK/US/intl8 charsets self.g0 = None def _charset_g1(self, cs): if cs == '0': self.g1 = graphics.dsg else: # TODO: Officially support UK/US/intl8 charsets self.g1 = None def _print(self, char): """ Print a character at the current cursor position and advance the cursor. """ # Don't make bugs where we try to print a screen. assert len(char) == 1 try: try: # Python 3 char = self.decoder(bytes(char, self.encoding))[0] except TypeError: # Python 2.x char = self.decoder(char)[0] except UnicodeDecodeError: char = "?" if self.current_charset == "g0" and self.g0 is not None: char = char.translate(self.g0) elif self.current_charset == "g1" and self.g1 is not None: char = char.translate(self.g1) row = self.display[self.y] self.display[self.y] = row[:self.x] + char + row[self.x+1:] attrs = self.attributes[self.y] self.attributes[self.y] = attrs[:self.x] + [self.cursor_attributes] + \ attrs[self.x+1:] self.x += 1 if self.x >= self.size[1]: # If this was the last column in a row, move the cursor to the # next row. self._linefeed() def _carriage_return(self): """ Move the cursor to the beginning of the current row. """ self.x = 0 def _index(self): """ Move the cursor down one row in the same column. If the cursor is at the last row, create a new row at the bottom. """ if self.y + 1 >= self.size[0]: # If the cursor is currently on the last row, then spawn another # and scroll down (removing the top row). self.display = self.display[1:] + [u" " * self.size[1]] else: # If the cursor is anywhere else, then just move it to the # next line. self.y += 1 def _reverse_index(self): """ Move the cursor up one row in the same column. If the cursor is at the first row, create a new row at the top. """ if self.y == 0: # If the cursor is currently at the first row, then scroll the # screen up. self.display = [u" " * self.size[1]] + self.display[:-1] else: # If the cursor is anywhere other than the first row than just move # it up by one row. self.y -= 1 def _linefeed(self): """ Performs an index and then a carriage return. """ self._index() self.x = 0 def _reverse_linefeed(self): """ Performs a reverse index and then a carriage return. """ self._reverse_index() self.x = 0 def _next_tab_stop(self): """ Return the x value of the next available tabstop or the x value of the margin if there are no more tabstops. """ for stop in sorted(self.tabstops): if self.x < stop: return stop return self.size[1] - 1 def _tab(self): """ Move to the next tab space, or the end of the screen if there aren't anymore left. """ self.x = self._next_tab_stop() def _backspace(self): """ Move cursor to the left one or keep it in it's position if it's at the beginning of the line already. """ self.x = max(0, self.x-1) def _save_cursor(self): """ Push the current cursor position onto the stack. """ self.cursor_save_stack.append((self.x, self.y)) def _restore_cursor(self): """ Set the current cursor position to whatever cursor is on top of the stack. """ if len(self.cursor_save_stack): self.x, self.y = self.cursor_save_stack.pop() def _insert_line(self, count=1): """ Inserts lines at line with cursor. Lines displayed below cursor move down. Lines moved past the bottom margin are lost. """ trimmed = self.display[:self.y+1] + \ [u" " * self.size[1]] * count + \ self.display[self.y+1:self.y+count+1] self.display = trimmed[:self.size[0]] def _delete_line(self, count=1): """ Deletes count lines, starting at line with cursor. As lines are deleted, lines displayed below cursor move up. Lines added to bottom of screen have spaces with same character attributes as last line moved up. """ self.display = self.display[:self.y] + \ self.display[self.y+1:] self.display.append([u" " * self.size[1]] * count) self.attributes = self.attributes[:self.y] + \ self.attributes[self.y+1:] last_attributes = self.attributes[-1] for _ in xrange(count): self.attributes.append(copy(last_attributes)) def _delete_character(self, count=1): """ Deletes count characters, starting with the character at cursor position. When a character is deleted, all characters to the right of cursor move left. """ # First resize the text display row = self.display[self.y] count = min(count, self.size[1] - self.x) row = row[:self.x] + row[self.x+count:] + u" " * count self.display[self.y] = row # Then resize the attribute array too attrs = self.attributes[self.y] attrs = attrs[:self.x] + attrs[self.x+count:] + [self.default_attributes] * count self.attributes[self.y] = attrs def _erase_in_line(self, type_of=0): """ Erases the row in a specific way, depending on the type_of. """ row = self.display[self.y] attrs = self.attributes[self.y] if type_of == 0: # Erase from the cursor to the end of line, including the cursor row = row[:self.x] + u" " * (self.size[1] - self.x) attrs = attrs[:self.x] + [self.default_attributes] * (self.size[1] - self.x) elif type_of == 1: # Erase from the beginning of the line to the cursor, including it row = u" " * (self.x+1) + row[self.x+1:] attrs = [self.default_attributes] * (self.x+1) + attrs[self.x+1:] elif type_of == 2: # Erase the entire line. row = u" " * self.size[1] attrs = [self.default_attributes] * self.size[1] self.display[self.y] = row self.attributes[self.y] = attrs def _erase_in_display(self, type_of=0): if type_of == 0: # Erase from cursor to the end of the display, including the # cursor. self.display = self.display[:self.y] + \ [u" " * self.size[1]] * (self.size[0] - self.y) self.attributes = self.attributes[:self.y] + \ [[self.default_attributes] * self.size[1]] * (self.size[0] - self.y) elif type_of == 1: # Erase from the beginning of the display to the cursor, including # it. self.display = [u" " * self.size[1]] * (self.y + 1) + \ self.display[self.y+1:] self.attributes = [[self.default_attributes] * self.size[1]] * (self.y + 1) + \ self.attributes[self.y+1:] elif type_of == 2: # Erase the whole display. self.display = [u" " * self.size[1]] * self.size[0] self.attributes = [[self.default_attributes] * self.size[1]] * self.size[0] def _set_insert_mode(self): self.irm = "insert" def _set_replace_mode(self): self.irm = "replace" def _set_tab_stop(self): """ Sets a horizontal tab stop at cursor position. """ self.tabstops.append(self.x) def _clear_tab_stop(self, type_of=0x33): if type_of == 0x30: # Clears a horizontal tab stop at cursor position. try: self.tabstops.remove(self.x) except ValueError: # If there is no tabstop at the current position, then just do # nothing. pass elif type_of == 0x33: # Clears all horizontal tab stops self.tabstops = [] def _cursor_up(self, count=1): """ Moves cursor up count lines in same column. Cursor stops at top margin. """ self.y = max(0, self.y - count) def _cursor_down(self, count=1): """ Moves cursor down count lines in same column. Cursor stops at bottom margin. """ self.y = min(self.size[0] - 1, self.y + count) def _cursor_back(self, count=1): """ Moves cursor left count columns. Cursor stops at left margin. """ self.x = max(0, self.x - count) def _cursor_forward(self, count=1): """ Moves cursor right count columns. Cursor stops at right margin. """ self.x = min(self.size[1] - 1, self.x + count) def _cursor_position(self, row=0, column=0): """ Set the cursor to a specific row and column. Obnoxiously row/column is 1 based, instead of zero based, so we need to compensate. I know I've created bugs in here somehow. Confoundingly, inputs of 0 are still acceptable, and should move to the beginning of the row/column as if they were 1. *sigh* """ if row == 0: row = 1 if column == 0: column = 1 self.y = min(row - 1, self.size[0] - 1) self.x = min(column - 1, self.size[1] - 1) def _home(self): """ Set the cursor to (0, 0) """ self.y = self.x = 0 def _remove_text_attr(self, attr): current = set(self.cursor_attributes[0]) if attr in current: current.remove(attr) return tuple(current) + self.cursor_attributes[1:] def _add_text_attr(self, attr): current = set(self.cursor_attributes[0]) current.add(attr) attrs = self.cursor_attributes[1:] return (tuple(current), attrs[0], attrs[1]) def _text_attr(self, attr): """ Given a text attribute, set the current cursor appropriately. """ attr = text[attr] if attr == "reset": self.cursor_attributes = self.default_attributes elif attr == "underline-off": self.cursor_attributes = self._remove_text_attr("underline") elif attr == "blink-off": self.cursor_attributes = self._remove_text_attr("blink") elif attr == "reverse-off": self.cursor_attributes = self._remove_text_attr("reverse") else: self.cursor_attributes = self._add_text_attr(attr) def _color_attr(self, ground, attr): """ Given a color attribute, set the current cursor appropriately. """ attr = colors[ground][attr] attrs = self.cursor_attributes if ground == "foreground": self.cursor_attributes = (attrs[0], attr, attrs[2]) elif ground == "background": self.cursor_attributes = (attrs[0], attrs[1], attr) def _set_attr(self, attr): """ Given some text attribute, set the current cursor attributes appropriately. """ if attr in text: self._text_attr(attr) elif attr in colors["foreground"]: self._color_attr("foreground", attr) elif attr in colors["background"]: self._color_attr("background", attr) def _select_graphic_rendition(self, *attrs): """ Set the current text attribute. """ if len(attrs) == 0: # No arguments means that we're really trying to do a reset. attrs = [0] for attr in attrs: self._set_attr(attr)
Ancestors (in MRO)
- screen
- builtins.object
Class variables
var default_attributes
Static methods
def __init__(
self, shape, encoding='utf-8')
def __init__(self, shape, encoding="utf-8"): rows, cols = shape self.encoding = encoding self.decoder = codecs.getdecoder(encoding) self.size = (rows, cols) self.x = 0 self.y = 0 self.irm = "insert" self.tabstops = [] self.g0 = None self.g1 = None self.current_charset = "g0" self.cursor_save_stack = [] # Initialize the screen to completely empty. self.display = [u" " * cols] * rows # Initialize the attributes to completely empty, but the same size as # the screen. self.attributes = [[self.default_attributes] * cols] * rows self.cursor_attributes = self.default_attributes
def attach(
self, events)
Attach this screen to a events that processes commands and dispatches events. Sets up the appropriate event handlers so that the screen will update itself automatically as the events processes data.
def attach(self, events): """ Attach this screen to a events that processes commands and dispatches events. Sets up the appropriate event handlers so that the screen will update itself automatically as the events processes data. """ if events is not None: events.add_event_listener("print", self._print) events.add_event_listener("backspace", self._backspace) events.add_event_listener("tab", self._tab) events.add_event_listener("linefeed", self._linefeed) events.add_event_listener("reverse-linefeed", self._reverse_linefeed) events.add_event_listener("carriage-return", self._carriage_return) events.add_event_listener("index", self._index) events.add_event_listener("reverse-index", self._reverse_index) events.add_event_listener("store-cursor", self._save_cursor) events.add_event_listener("restore-cursor", self._restore_cursor) events.add_event_listener("cursor-up", self._cursor_up) events.add_event_listener("cursor-down", self._cursor_down) events.add_event_listener("cursor-right", self._cursor_forward) events.add_event_listener("cursor-left", self._cursor_back) events.add_event_listener("cursor-move", self._cursor_position) events.add_event_listener("erase-in-line", self._erase_in_line) events.add_event_listener("erase-in-display", self._erase_in_display) events.add_event_listener("delete-characters", self._delete_character) events.add_event_listener("insert-lines", self._insert_line) events.add_event_listener("delete-lines", self._delete_line) events.add_event_listener("select-graphic-rendition", self._select_graphic_rendition) events.add_event_listener("charset-g0", self._charset_g0) events.add_event_listener("charset-g1", self._charset_g1) events.add_event_listener("shift-in", self._shift_in) events.add_event_listener("shift-out", self._shift_out) events.add_event_listener("bell", self._bell)
def cursor(
self)
The current location of the cursor.
def cursor(self): """ The current location of the cursor. """ return (self.x, self.y)
def resize(
self, shape)
Resize the screen. If the requested screen size has more rows than the existing screen, rows will be added at the bottom. If the requested size has less rows than the existing screen rows will be clipped at the top of the screen.
Similarly if the existing screen has less columns than the requested size, columns will be added at the right, and it it has more, columns will be clipped at the right.
def resize(self, shape): """ Resize the screen. If the requested screen size has more rows than the existing screen, rows will be added at the bottom. If the requested size has less rows than the existing screen rows will be clipped at the top of the screen. Similarly if the existing screen has less columns than the requested size, columns will be added at the right, and it it has more, columns will be clipped at the right. """ rows, cols = shape # Honestly though, you can't trust anyone these days... assert(rows > 0 and cols > 0) # First resize the rows if self.size[0] < rows: # If the current display size is shorter than the requested screen # size, then add rows to the bottom. Note that the old column size # is used here so these new rows will get expanded/contracted as # necessary by the column resize when it happens next. self.display += [u" " * self.size[1]] * (rows - self.size[0]) self.attributes += [[self.default_attributes] * self.size[1]] * \ (rows - self.size[0]) elif self.size[0] > rows: # If the current display size is taller than the requested display, # then take rows off the top. self.display = self.display[self.size[0]-rows:] self.attributes = self.attributes[self.size[0]-rows:] # Next, of course, resize the columns. if self.size[1] < cols: # If the current display size is thinner than the requested size, # expand each row to be the new size. self.display = \ [row + (u" " * (cols - self.size[1])) for row in self.display] self.attributes = \ [row + ([self.default_attributes] * (cols - self.size[1])) for row in self.attributes] elif self.size[1] > cols: # If the current display size is fatter than the requested size, # then trim each row from the right to be the new size. self.display = [row[:cols-self.size[1]] for row in self.display] self.attributes = [row[:cols-self.size[1]] for row in self.attributes] self.size = (rows, cols) return self.size
Instance variables
var attributes
var current_charset
var cursor_attributes
var cursor_save_stack
var decoder
var display
var encoding
var g0
var g1
var irm
var size
var tabstops
var x
var y
class stream
A stream is the state machine that parses a stream of terminal characters and dispatches events based on what it sees. This can be attached to a screen object and it's events, or can be used some other way.
stream.basic
, stream.escape
, and stream.sequence
are the relevant
events that get thrown with one addition: print
. For details on the
event parameters, see the vt102 user's
guide
Quick example:
>>> s = stream() >>> class Cursor: ... def __init__(self): ... self.x = 10; self.y = 10 ... def up(self, count): ... self.y -= count ... >>> c = Cursor() >>> s.add_event_listener("cursor-up", c.up) >>> s.process(u"\x00\x1b[5A") # Move the cursor up 5 rows. >>> print(c.y) 5
class stream: """ A stream is the state machine that parses a stream of terminal characters and dispatches events based on what it sees. This can be attached to a screen object and it's events, or can be used some other way. `stream.basic`, `stream.escape`, and `stream.sequence` are the relevant events that get thrown with one addition: `print`. For details on the event parameters, see the [vt102 user's guide](http://vt100.net/docs/vt102-ug/) Quick example: >>> s = stream() >>> class Cursor: ... def __init__(self): ... self.x = 10; self.y = 10 ... def up(self, count): ... self.y -= count ... >>> c = Cursor() >>> s.add_event_listener("cursor-up", c.up) >>> s.process(u"\\x00\\x1b[5A") # Move the cursor up 5 rows. >>> print(c.y) 5 """ basic = { ctrl.BS: "backspace", ctrl.HT: "tab", ctrl.LF: "linefeed", ctrl.VT: "linefeed", ctrl.FF: "linefeed", ctrl.CR: "carriage-return", ctrl.SI: "shift-in", ctrl.SO: "shift-out", ctrl.BEL: "bell" } escape = { esc.IND: "index", esc.RI: "reverse-index", esc.NEL: "linefeed", esc.DECSC: "store-cursor", esc.DECRC: "restore-cursor", esc.RLF: "reverse-linefeed", } sequence = { esc.CUU: "cursor-up", esc.CUD: "cursor-down", esc.CUF: "cursor-right", esc.CUB: "cursor-left", esc.CUP: "cursor-move", esc.HVP: "cursor-move", esc.EL: "erase-in-line", esc.ED: "erase-in-display", esc.DCH: "delete-characters", esc.IL: "insert-lines", esc.DL: "delete-lines", esc.SGR: "select-graphic-rendition", esc.DECSTBM: "set-margins", esc.IRMI: "set-insert", esc.IRMR: "set-replace", } def __init__(self, fail_on_unknown_esc=True): self.state = "stream" self.params = [] self.current_param = "" self.listeners = {} self.fail_on_unknown_esc = fail_on_unknown_esc def _escape_sequence(self, char): """ Handle characters seen when in an escape sequence. Most non-vt52 commands start with a left-bracket after the escape and then a stream of parameters and a command. """ num = ord(char) if char == "[": self.state = "escape-lb" elif char == "(": self.state = "charset-g0" elif char == ")": self.state = "charset-g1" elif num in self.escape: self.dispatch(self.escape[num]) self.state = "stream" elif self.fail_on_unknown_esc: raise StreamProcessError("Unexpected character '%c' == '0x%02x'" % (char, ord(char))) def _end_escape_sequence(self, char): """ Handle the end of an escape sequence. The final character in an escape sequence is the command to execute, which corresponds to the event that is dispatched here. """ num = ord(char) if num in self.sequence: self.dispatch(self.sequence[num], *self.params) self.state = "stream" self.current_param = "" self.params = [] def _escape_parameters(self, char): """ Parse parameters in an escape sequence. Parameters are a list of numbers in ascii (e.g. '12', '4', '42', etc) separated by a semicolon (e.g. "12;4;42"). See the [vt102 user guide](http://vt100.net/docs/vt102-ug/) for more details on the formatting of escape parameters. """ if char == ";": self.params.append(int(self.current_param)) self.current_param = "" elif char == "?": self.state = "mode" elif not char.isdigit(): if len(self.current_param) > 0: self.params.append(int(self.current_param)) # If we're in parameter parsing mode, but we see a non-numeric # value, it must be the end of the control sequence. self._end_escape_sequence(char) else: self.current_param += char def _mode(self, char): if char in "lh": # 'l' or 'h' designates the end of a mode stream. We don't # really care about mode streams so anything else seen while # in the mode state, is just ignored. self.state = "stream" def _charset_g0(self, char): self.dispatch("charset-g0", char) self.state = "stream" def _charset_g1(self, char): self.dispatch("charset-g1", char) self.state = "stream" def _stream(self, char): """ Process a character when in the default 'stream' state. """ num = ord(char) if num in self.basic: self.dispatch(self.basic[num]) elif num == ctrl.ESC: self.state = "escape" elif num == 0x00: # nulls are just ignored. pass else: self.dispatch("print", char) def consume(self, char): """ Consume a single character and advance the state as necessary. """ if self.state == "stream": self._stream(char) elif self.state == "escape": self._escape_sequence(char) elif self.state == "escape-lb": self._escape_parameters(char) elif self.state == "mode": self._mode(char) elif self.state == "charset-g0": self._charset_g0(char) elif self.state == "charset-g1": self._charset_g1(char) def process(self, chars): """ Consume a string of and advance the state as necessary. """ while len(chars) > 0: self.consume(chars[0]) chars = chars[1:] def add_event_listener(self, event, function): """ Add an event listen for a particular event. Depending on the event there may or may not be parameters passed to function. Most escape streams also allow for an empty set of parameters (with a default value). Providing these default values and accepting variable arguments is the responsibility of function. More than one listener may be added for a single event. Each listener will be called. * **event** The event to listen for. * **function** The callable to invoke. """ if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(function) def dispatch(self, event, *args): """ Dispatch an event where `args` is a tuple of the arguments to send to any callbacks. If any callback throws an exception, the subsequent callbacks will be aborted. """ for callback in self.listeners.get(event, []): if len(args) > 0: callback(*args) else: callback()
Ancestors (in MRO)
- stream
- builtins.object
Class variables
var basic
var escape
var sequence
Static methods
def __init__(
self, fail_on_unknown_esc=True)
def __init__(self, fail_on_unknown_esc=True): self.state = "stream" self.params = [] self.current_param = "" self.listeners = {} self.fail_on_unknown_esc = fail_on_unknown_esc
def add_event_listener(
self, event, function)
Add an event listen for a particular event. Depending on the event there may or may not be parameters passed to function. Most escape streams also allow for an empty set of parameters (with a default value). Providing these default values and accepting variable arguments is the responsibility of function.
More than one listener may be added for a single event. Each listener will be called.
- event The event to listen for.
- function The callable to invoke.
def add_event_listener(self, event, function): """ Add an event listen for a particular event. Depending on the event there may or may not be parameters passed to function. Most escape streams also allow for an empty set of parameters (with a default value). Providing these default values and accepting variable arguments is the responsibility of function. More than one listener may be added for a single event. Each listener will be called. * **event** The event to listen for. * **function** The callable to invoke. """ if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(function)
def consume(
self, char)
Consume a single character and advance the state as necessary.
def consume(self, char): """ Consume a single character and advance the state as necessary. """ if self.state == "stream": self._stream(char) elif self.state == "escape": self._escape_sequence(char) elif self.state == "escape-lb": self._escape_parameters(char) elif self.state == "mode": self._mode(char) elif self.state == "charset-g0": self._charset_g0(char) elif self.state == "charset-g1": self._charset_g1(char)
def dispatch(
self, event, *args)
Dispatch an event where args
is a tuple of the arguments to send to
any callbacks. If any callback throws an exception, the subsequent
callbacks will be aborted.
def dispatch(self, event, *args): """ Dispatch an event where `args` is a tuple of the arguments to send to any callbacks. If any callback throws an exception, the subsequent callbacks will be aborted. """ for callback in self.listeners.get(event, []): if len(args) > 0: callback(*args) else: callback()
def process(
self, chars)
Consume a string of and advance the state as necessary.
def process(self, chars): """ Consume a string of and advance the state as necessary. """ while len(chars) > 0: self.consume(chars[0]) chars = chars[1:]
Instance variables
var current_param
var fail_on_unknown_esc
var listeners
var params
var state