ansiterm.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. Emulate a vt100 terminal in cmd.exe
  5. By wrapping sys.stdout / sys.stderr with Ansiterm,
  6. the vt100 escape characters will be interpreted and
  7. the equivalent actions will be performed with Win32
  8. console commands.
  9. """
  10. import os, re, sys
  11. from waflib import Utils
  12. wlock = Utils.threading.Lock()
  13. try:
  14. from ctypes import Structure, WinDLL, c_short, c_ushort, c_ulong, c_int, byref, c_wchar, POINTER, c_long
  15. except ImportError:
  16. class AnsiTerm(object):
  17. def __init__(self, stream):
  18. self.stream = stream
  19. try:
  20. self.errors = self.stream.errors
  21. except AttributeError:
  22. pass # python 2.5
  23. self.encoding = self.stream.encoding
  24. def write(self, txt):
  25. try:
  26. wlock.acquire()
  27. self.stream.write(txt)
  28. self.stream.flush()
  29. finally:
  30. wlock.release()
  31. def fileno(self):
  32. return self.stream.fileno()
  33. def flush(self):
  34. self.stream.flush()
  35. def isatty(self):
  36. return self.stream.isatty()
  37. else:
  38. class COORD(Structure):
  39. _fields_ = [("X", c_short), ("Y", c_short)]
  40. class SMALL_RECT(Structure):
  41. _fields_ = [("Left", c_short), ("Top", c_short), ("Right", c_short), ("Bottom", c_short)]
  42. class CONSOLE_SCREEN_BUFFER_INFO(Structure):
  43. _fields_ = [("Size", COORD), ("CursorPosition", COORD), ("Attributes", c_ushort), ("Window", SMALL_RECT), ("MaximumWindowSize", COORD)]
  44. class CONSOLE_CURSOR_INFO(Structure):
  45. _fields_ = [('dwSize', c_ulong), ('bVisible', c_int)]
  46. try:
  47. _type = unicode
  48. except NameError:
  49. _type = str
  50. to_int = lambda number, default: number and int(number) or default
  51. STD_OUTPUT_HANDLE = -11
  52. STD_ERROR_HANDLE = -12
  53. kernel32 = WinDLL('kernel32')
  54. kernel32.GetStdHandle.argtypes = [c_ulong]
  55. kernel32.GetStdHandle.restype = c_ulong
  56. kernel32.GetConsoleScreenBufferInfo.argtypes = [c_ulong, POINTER(CONSOLE_SCREEN_BUFFER_INFO)]
  57. kernel32.GetConsoleScreenBufferInfo.restype = c_long
  58. kernel32.SetConsoleTextAttribute.argtypes = [c_ulong, c_ushort]
  59. kernel32.SetConsoleTextAttribute.restype = c_long
  60. kernel32.FillConsoleOutputCharacterW.argtypes = [c_ulong, c_wchar, c_ulong, POINTER(COORD), POINTER(c_ulong)]
  61. kernel32.FillConsoleOutputCharacterW.restype = c_long
  62. kernel32.FillConsoleOutputAttribute.argtypes = [c_ulong, c_ushort, c_ulong, POINTER(COORD), POINTER(c_ulong) ]
  63. kernel32.FillConsoleOutputAttribute.restype = c_long
  64. kernel32.SetConsoleCursorPosition.argtypes = [c_ulong, POINTER(COORD) ]
  65. kernel32.SetConsoleCursorPosition.restype = c_long
  66. kernel32.SetConsoleCursorInfo.argtypes = [c_ulong, POINTER(CONSOLE_CURSOR_INFO)]
  67. kernel32.SetConsoleCursorInfo.restype = c_long
  68. class AnsiTerm(object):
  69. """
  70. emulate a vt100 terminal in cmd.exe
  71. """
  72. def __init__(self, s):
  73. self.stream = s
  74. try:
  75. self.errors = s.errors
  76. except AttributeError:
  77. pass # python2.5
  78. self.encoding = s.encoding
  79. self.cursor_history = []
  80. handle = (s.fileno() == 2) and STD_ERROR_HANDLE or STD_OUTPUT_HANDLE
  81. self.hconsole = kernel32.GetStdHandle(handle)
  82. self._sbinfo = CONSOLE_SCREEN_BUFFER_INFO()
  83. self._csinfo = CONSOLE_CURSOR_INFO()
  84. kernel32.GetConsoleCursorInfo(self.hconsole, byref(self._csinfo))
  85. # just to double check that the console is usable
  86. self._orig_sbinfo = CONSOLE_SCREEN_BUFFER_INFO()
  87. r = kernel32.GetConsoleScreenBufferInfo(self.hconsole, byref(self._orig_sbinfo))
  88. self._isatty = r == 1
  89. def screen_buffer_info(self):
  90. """
  91. Updates self._sbinfo and returns it
  92. """
  93. kernel32.GetConsoleScreenBufferInfo(self.hconsole, byref(self._sbinfo))
  94. return self._sbinfo
  95. def clear_line(self, param):
  96. mode = param and int(param) or 0
  97. sbinfo = self.screen_buffer_info()
  98. if mode == 1: # Clear from beginning of line to cursor position
  99. line_start = COORD(0, sbinfo.CursorPosition.Y)
  100. line_length = sbinfo.Size.X
  101. elif mode == 2: # Clear entire line
  102. line_start = COORD(sbinfo.CursorPosition.X, sbinfo.CursorPosition.Y)
  103. line_length = sbinfo.Size.X - sbinfo.CursorPosition.X
  104. else: # Clear from cursor position to end of line
  105. line_start = sbinfo.CursorPosition
  106. line_length = sbinfo.Size.X - sbinfo.CursorPosition.X
  107. chars_written = c_ulong()
  108. kernel32.FillConsoleOutputCharacterW(self.hconsole, c_wchar(' '), line_length, line_start, byref(chars_written))
  109. kernel32.FillConsoleOutputAttribute(self.hconsole, sbinfo.Attributes, line_length, line_start, byref(chars_written))
  110. def clear_screen(self, param):
  111. mode = to_int(param, 0)
  112. sbinfo = self.screen_buffer_info()
  113. if mode == 1: # Clear from beginning of screen to cursor position
  114. clear_start = COORD(0, 0)
  115. clear_length = sbinfo.CursorPosition.X * sbinfo.CursorPosition.Y
  116. elif mode == 2: # Clear entire screen and return cursor to home
  117. clear_start = COORD(0, 0)
  118. clear_length = sbinfo.Size.X * sbinfo.Size.Y
  119. kernel32.SetConsoleCursorPosition(self.hconsole, clear_start)
  120. else: # Clear from cursor position to end of screen
  121. clear_start = sbinfo.CursorPosition
  122. clear_length = ((sbinfo.Size.X - sbinfo.CursorPosition.X) + sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y))
  123. chars_written = c_ulong()
  124. kernel32.FillConsoleOutputCharacterW(self.hconsole, c_wchar(' '), clear_length, clear_start, byref(chars_written))
  125. kernel32.FillConsoleOutputAttribute(self.hconsole, sbinfo.Attributes, clear_length, clear_start, byref(chars_written))
  126. def push_cursor(self, param):
  127. sbinfo = self.screen_buffer_info()
  128. self.cursor_history.append(sbinfo.CursorPosition)
  129. def pop_cursor(self, param):
  130. if self.cursor_history:
  131. old_pos = self.cursor_history.pop()
  132. kernel32.SetConsoleCursorPosition(self.hconsole, old_pos)
  133. def set_cursor(self, param):
  134. y, sep, x = param.partition(';')
  135. x = to_int(x, 1) - 1
  136. y = to_int(y, 1) - 1
  137. sbinfo = self.screen_buffer_info()
  138. new_pos = COORD(
  139. min(max(0, x), sbinfo.Size.X),
  140. min(max(0, y), sbinfo.Size.Y)
  141. )
  142. kernel32.SetConsoleCursorPosition(self.hconsole, new_pos)
  143. def set_column(self, param):
  144. x = to_int(param, 1) - 1
  145. sbinfo = self.screen_buffer_info()
  146. new_pos = COORD(
  147. min(max(0, x), sbinfo.Size.X),
  148. sbinfo.CursorPosition.Y
  149. )
  150. kernel32.SetConsoleCursorPosition(self.hconsole, new_pos)
  151. def move_cursor(self, x_offset=0, y_offset=0):
  152. sbinfo = self.screen_buffer_info()
  153. new_pos = COORD(
  154. min(max(0, sbinfo.CursorPosition.X + x_offset), sbinfo.Size.X),
  155. min(max(0, sbinfo.CursorPosition.Y + y_offset), sbinfo.Size.Y)
  156. )
  157. kernel32.SetConsoleCursorPosition(self.hconsole, new_pos)
  158. def move_up(self, param):
  159. self.move_cursor(y_offset = -to_int(param, 1))
  160. def move_down(self, param):
  161. self.move_cursor(y_offset = to_int(param, 1))
  162. def move_left(self, param):
  163. self.move_cursor(x_offset = -to_int(param, 1))
  164. def move_right(self, param):
  165. self.move_cursor(x_offset = to_int(param, 1))
  166. def next_line(self, param):
  167. sbinfo = self.screen_buffer_info()
  168. self.move_cursor(
  169. x_offset = -sbinfo.CursorPosition.X,
  170. y_offset = to_int(param, 1)
  171. )
  172. def prev_line(self, param):
  173. sbinfo = self.screen_buffer_info()
  174. self.move_cursor(
  175. x_offset = -sbinfo.CursorPosition.X,
  176. y_offset = -to_int(param, 1)
  177. )
  178. def rgb2bgr(self, c):
  179. return ((c&1) << 2) | (c&2) | ((c&4)>>2)
  180. def set_color(self, param):
  181. cols = param.split(';')
  182. sbinfo = self.screen_buffer_info()
  183. attr = sbinfo.Attributes
  184. for c in cols:
  185. c = to_int(c, 0)
  186. if 29 < c < 38: # fgcolor
  187. attr = (attr & 0xfff0) | self.rgb2bgr(c - 30)
  188. elif 39 < c < 48: # bgcolor
  189. attr = (attr & 0xff0f) | (self.rgb2bgr(c - 40) << 4)
  190. elif c == 0: # reset
  191. attr = self._orig_sbinfo.Attributes
  192. elif c == 1: # strong
  193. attr |= 0x08
  194. elif c == 4: # blink not available -> bg intensity
  195. attr |= 0x80
  196. elif c == 7: # negative
  197. attr = (attr & 0xff88) | ((attr & 0x70) >> 4) | ((attr & 0x07) << 4)
  198. kernel32.SetConsoleTextAttribute(self.hconsole, attr)
  199. def show_cursor(self,param):
  200. self._csinfo.bVisible = 1
  201. kernel32.SetConsoleCursorInfo(self.hconsole, byref(self._csinfo))
  202. def hide_cursor(self,param):
  203. self._csinfo.bVisible = 0
  204. kernel32.SetConsoleCursorInfo(self.hconsole, byref(self._csinfo))
  205. ansi_command_table = {
  206. 'A': move_up,
  207. 'B': move_down,
  208. 'C': move_right,
  209. 'D': move_left,
  210. 'E': next_line,
  211. 'F': prev_line,
  212. 'G': set_column,
  213. 'H': set_cursor,
  214. 'f': set_cursor,
  215. 'J': clear_screen,
  216. 'K': clear_line,
  217. 'h': show_cursor,
  218. 'l': hide_cursor,
  219. 'm': set_color,
  220. 's': push_cursor,
  221. 'u': pop_cursor,
  222. }
  223. # Match either the escape sequence or text not containing escape sequence
  224. ansi_tokens = re.compile(r'(?:\x1b\[([0-9?;]*)([a-zA-Z])|([^\x1b]+))')
  225. def write(self, text):
  226. try:
  227. wlock.acquire()
  228. if self._isatty:
  229. for param, cmd, txt in self.ansi_tokens.findall(text):
  230. if cmd:
  231. cmd_func = self.ansi_command_table.get(cmd)
  232. if cmd_func:
  233. cmd_func(self, param)
  234. else:
  235. self.writeconsole(txt)
  236. else:
  237. # no support for colors in the console, just output the text:
  238. # eclipse or msys may be able to interpret the escape sequences
  239. self.stream.write(text)
  240. finally:
  241. wlock.release()
  242. def writeconsole(self, txt):
  243. chars_written = c_ulong()
  244. writeconsole = kernel32.WriteConsoleA
  245. if isinstance(txt, _type):
  246. writeconsole = kernel32.WriteConsoleW
  247. # MSDN says that there is a shared buffer of 64 KB for the console
  248. # writes. Attempt to not get ERROR_NOT_ENOUGH_MEMORY, see waf issue #746
  249. done = 0
  250. todo = len(txt)
  251. chunk = 32<<10
  252. while todo != 0:
  253. doing = min(chunk, todo)
  254. buf = txt[done:done+doing]
  255. r = writeconsole(self.hconsole, buf, doing, byref(chars_written), None)
  256. if r == 0:
  257. chunk >>= 1
  258. continue
  259. done += doing
  260. todo -= doing
  261. def fileno(self):
  262. return self.stream.fileno()
  263. def flush(self):
  264. pass
  265. def isatty(self):
  266. return self._isatty
  267. if sys.stdout.isatty() or sys.stderr.isatty():
  268. handle = sys.stdout.isatty() and STD_OUTPUT_HANDLE or STD_ERROR_HANDLE
  269. console = kernel32.GetStdHandle(handle)
  270. sbinfo = CONSOLE_SCREEN_BUFFER_INFO()
  271. def get_term_cols():
  272. kernel32.GetConsoleScreenBufferInfo(console, byref(sbinfo))
  273. # Issue 1401 - the progress bar cannot reach the last character
  274. return sbinfo.Size.X - 1
  275. # just try and see
  276. try:
  277. import struct, fcntl, termios
  278. except ImportError:
  279. pass
  280. else:
  281. if (sys.stdout.isatty() or sys.stderr.isatty()) and os.environ.get('TERM', '') not in ('dumb', 'emacs'):
  282. FD = sys.stdout.isatty() and sys.stdout.fileno() or sys.stderr.fileno()
  283. def fun():
  284. return struct.unpack("HHHH", fcntl.ioctl(FD, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)))[1]
  285. try:
  286. fun()
  287. except Exception as e:
  288. pass
  289. else:
  290. get_term_cols = fun