request.lua 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. local bit = require "bit"
  2. local buffer = require "resty.t1k.buffer"
  3. local consts = require "resty.t1k.constants"
  4. local file = require "resty.t1k.file"
  5. local log = require "resty.t1k.log"
  6. local utils = require "resty.t1k.utils"
  7. local uuid = require "resty.t1k.uuid"
  8. local _M = {
  9. _VERSION = '1.0.0',
  10. }
  11. local bor = bit.bor
  12. local byte = string.byte
  13. local char = string.char
  14. local fmt = string.format
  15. local sub = string.sub
  16. local concat = table.concat
  17. local ngx = ngx
  18. local nlog = ngx.log
  19. local ngx_req = ngx.req
  20. local ngx_socket = ngx.socket
  21. local ngx_var = ngx.var
  22. local warn_fmt = log.warn_fmt
  23. local debug_fmt = log.debug_fmt
  24. local KEY_EXTRA_UUID = "UUID"
  25. local KEY_EXTRA_LOCAL_ADDR = "LocalAddr"
  26. local KEY_EXTRA_LOCAL_PORT = "LocalPort"
  27. local KEY_EXTRA_REMOTE_ADDR = "RemoteAddr"
  28. local KEY_EXTRA_REMOTE_PORT = "RemotePort"
  29. local KEY_EXTRA_SCHEME = "Scheme"
  30. local KEY_EXTRA_SERVER_NAME = "ServerName"
  31. local KEY_EXTRA_PROXY_NAME = "ProxyName"
  32. local KEY_EXTRA_REQ_BEGIN_TIME = "ReqBeginTime"
  33. local KEY_EXTRA_HAS_RSP_IF_OK = "HasRspIfOK"
  34. local KEY_EXTRA_HAS_RSP_IF_BLOCK = "HasRspIfBlock"
  35. local TAG_HEAD_WITH_MASK_FIRST = bor(consts.TAG_HEAD, consts.MASK_FIRST)
  36. local TAG_EXTRA_WITH_MASK_LAST = bor(consts.TAG_EXTRA, consts.MASK_LAST)
  37. local T1K_PROTO = "Proto:2\n"
  38. local T1K_PROTO_DATA = fmt("%s%s%s", char(consts.TAG_VERSION), utils.int_to_char_length(#T1K_PROTO), T1K_PROTO)
  39. local function read_request_body(opt_req_body_size)
  40. local ok, err
  41. local req_body, req_body_size
  42. local content_length = tonumber(ngx_var.http_content_length) or 0
  43. local transfer_encoding = ngx_var.http_transfer_encoding
  44. if content_length == 0 and not transfer_encoding then
  45. return true, nil, nil
  46. end
  47. ngx_req.read_body()
  48. req_body = ngx_req.get_body_data()
  49. if req_body then
  50. req_body_size = #req_body
  51. if req_body_size > opt_req_body_size then
  52. nlog(debug_fmt("request body is too long: %d bytes, cut to %d bytes", req_body_size, opt_req_body_size))
  53. req_body = sub(req_body, 1, opt_req_body_size)
  54. end
  55. return true, nil, req_body
  56. end
  57. local path = ngx_req.get_body_file()
  58. if not path then
  59. return true, nil, nil
  60. end
  61. ok, err, req_body = file.read(path, opt_req_body_size)
  62. if not ok then
  63. err = fmt("failed to read temporary file %s: %s", path, err)
  64. return ok, err, nil
  65. end
  66. return true, nil, req_body
  67. end
  68. local function get_remote_addr(remote_addr_var, remote_addr_idx)
  69. local addr
  70. if remote_addr_var then
  71. addr = utils.get_indexed_element(ngx_var[remote_addr_var], remote_addr_idx)
  72. end
  73. return addr or ngx_var.remote_addr
  74. end
  75. local function parse_v(v)
  76. if type(v) == "table" then
  77. return concat(v, ", ")
  78. end
  79. return tostring(v)
  80. end
  81. local function build_header()
  82. local http_version = ngx_req.http_version()
  83. if http_version < 2.0 then
  84. return true, nil, ngx_req.raw_header()
  85. end
  86. local headers, err = ngx_req.get_headers(0, true)
  87. if err then
  88. err = fmt("failed to call ngx_req.get_headers: %s", err)
  89. return nil, err, nil
  90. end
  91. local buf = buffer:new()
  92. buf:add(fmt("%s %s HTTP/%.1f\r\n", ngx_req.get_method(), ngx_var.request_uri, http_version))
  93. for k, v in pairs(headers) do
  94. buf:add(fmt("%s: %s\r\n", k, parse_v(v)))
  95. end
  96. buf:add("\r\n")
  97. return true, nil, buf
  98. end
  99. local function build_body(opts)
  100. local ok, err
  101. local body
  102. local req_body_size = opts.req_body_size * 1024
  103. ok, err, body = read_request_body(req_body_size)
  104. if not ok then
  105. return ok, err, nil
  106. end
  107. return true, nil, body
  108. end
  109. local function build_extra(opts)
  110. local err
  111. local src_ip = get_remote_addr(opts.remote_addr_var, opts.remote_addr_idx)
  112. if not src_ip then
  113. err = fmt("failed to get remote_addr, var: %s, idx %d", opts.remote_addr_var, opts.remote_addr_idx)
  114. return nil, err
  115. end
  116. local src_port = ngx_var.remote_port
  117. if not src_port then
  118. err = "failed to get ngx_var.remote_port"
  119. return nil, err, nil
  120. end
  121. local local_ip = ngx_var.server_addr
  122. if not local_ip then
  123. err = "failed to get ngx_var.server_addr"
  124. return nil, err, nil
  125. end
  126. local local_port = ngx_var.server_port
  127. if not local_port then
  128. err = "failed to get ngx_var.server_port"
  129. return nil, err, nil
  130. end
  131. local extra = buffer:new({
  132. KEY_EXTRA_UUID, ":", uuid.generate_v4(), "\n",
  133. KEY_EXTRA_REMOTE_ADDR, ":", src_ip, "\n",
  134. KEY_EXTRA_REMOTE_PORT, ":", src_port, "\n",
  135. KEY_EXTRA_LOCAL_ADDR, ":", local_ip, "\n",
  136. KEY_EXTRA_LOCAL_PORT, ":", local_port, "\n",
  137. KEY_EXTRA_SCHEME, ":", ngx_var.scheme, "\n",
  138. KEY_EXTRA_SERVER_NAME, ":", ngx_var.server_name, "\n",
  139. KEY_EXTRA_PROXY_NAME, ":", ngx_var.hostname, "\n",
  140. KEY_EXTRA_REQ_BEGIN_TIME, ":", fmt("%.0f", ngx_req.start_time() * 1000000), "\n",
  141. KEY_EXTRA_HAS_RSP_IF_OK, ":n\n",
  142. KEY_EXTRA_HAS_RSP_IF_BLOCK, ":n\n"
  143. })
  144. return true, nil, extra
  145. end
  146. local function do_send(sock, data)
  147. local ok, err = sock:send(data)
  148. if not ok then
  149. return ok, err
  150. end
  151. return true, nil
  152. end
  153. local function receive_data(s, srv)
  154. local t = {}
  155. local ft = true
  156. local finished
  157. repeat
  158. local err
  159. local tag, length, packet, rsp_body
  160. packet, err = s:receive(consts.T1K_HEADER_SIZE)
  161. if err then
  162. err = fmt("failed to receive info packet from t1k server %s: %s", srv, err)
  163. return nil, err, nil
  164. end
  165. if not packet then
  166. err = fmt("empty packet from t1k server %s", srv)
  167. return nil, err, nil
  168. end
  169. if ft then
  170. if not utils.is_mask_first(byte(packet, 1, 1)) then
  171. err = fmt("first packet is not MASK_FIRST from t1k server %s", srv)
  172. return nil, err, nil
  173. end
  174. ft = false
  175. end
  176. finished, tag, length = utils.packet_parser(packet)
  177. if length > 0 then
  178. rsp_body, err = s:receive(length)
  179. if not rsp_body or #rsp_body ~= length then
  180. err = fmt("failed to receive payload from t1k server %s: %s", srv, err)
  181. return nil, err, nil
  182. end
  183. t[tag] = rsp_body
  184. end
  185. until (finished)
  186. return true, nil, t
  187. end
  188. local function get_socket(opts)
  189. local ok, err
  190. local count, sock, server
  191. sock, err = ngx_socket.tcp()
  192. if not sock then
  193. err = fmt("failed to create socket: %s", err)
  194. return nil, err, nil, nil
  195. end
  196. sock:settimeouts(opts.connect_timeout, opts.send_timeout, opts.read_timeout)
  197. if opts.uds then
  198. server = opts.host
  199. ok, err = sock:connect(opts.host)
  200. else
  201. server = fmt("%s:%d", opts.host, opts.port)
  202. ok, err = sock:connect(opts.host, opts.port)
  203. end
  204. if not ok then
  205. sock:close()
  206. err = fmt("failed to connect to t1k server %s: %s", server, err)
  207. return ok, err, nil, nil
  208. end
  209. nlog(debug_fmt("successfully connected to t1k server %s", server))
  210. count, err = sock:getreusedtimes()
  211. if not count then
  212. nlog(warn_fmt("failed to get reused times from t1k server %s: %s", server, err))
  213. end
  214. if count == 0 then
  215. ok, err = sock:setoption("keepalive", true)
  216. if not ok then
  217. nlog(warn_fmt("failed to set keepalive for t1k server %s: %s", server, err))
  218. end
  219. ok, err = sock:setoption("reuseaddr", true)
  220. if not ok then
  221. nlog(warn_fmt("failed to set reuseaddr for t1k server %s: %s", server, err))
  222. end
  223. ok, err = sock:setoption("tcp-nodelay", true)
  224. if not ok then
  225. nlog(warn_fmt("failed to set tcp-nodelay for t1k server %s: %s", server, err))
  226. end
  227. end
  228. return true, nil, sock, server
  229. end
  230. local function do_socket(opts, header, body, extra)
  231. local ok, err
  232. local t, sock, server
  233. ok, err, sock, server = get_socket(opts)
  234. if not ok then
  235. err = fmt("failed to get socket: %s", err)
  236. return ok, err, nil
  237. end
  238. ok, err = do_send(sock, { char(TAG_HEAD_WITH_MASK_FIRST), utils.int_to_char_length(header:len()), header })
  239. if not ok then
  240. sock:close()
  241. err = fmt("failed to send header data to t1k server %s: %s", server, err)
  242. return ok, err, nil
  243. end
  244. if body ~= nil then
  245. ok, err = do_send(sock, { char(consts.TAG_BODY), utils.int_to_char_length(body:len()), body })
  246. if not ok then
  247. sock:close()
  248. err = fmt("failed to send body data to t1k server %s: %s", server, err)
  249. return ok, err, nil
  250. end
  251. end
  252. ok, err = do_send(sock, { T1K_PROTO_DATA, char(TAG_EXTRA_WITH_MASK_LAST), utils.int_to_char_length(extra:len()), extra })
  253. if not ok then
  254. sock:close()
  255. err = fmt("failed to send extra data to t1k server %s: %s", server, err)
  256. return ok, err, nil
  257. end
  258. ok, err, t = receive_data(sock, server)
  259. if not ok then
  260. return ok, err, nil
  261. end
  262. ok, err = sock:setkeepalive(opts.keepalive_timeout, opts.keepalive_size)
  263. if not ok then
  264. nlog(warn_fmt("failed to set keepalive: %s", err))
  265. sock:close()
  266. end
  267. return true, nil, t
  268. end
  269. function _M.do_request(opts)
  270. local ok, err
  271. local header, body, extra, t
  272. ok, err, header = build_header(opts)
  273. if not ok then
  274. return ok, err, nil
  275. end
  276. ok, err, body = build_body(opts)
  277. if not ok then
  278. return ok, err, nil
  279. end
  280. ok, err, extra = build_extra(opts)
  281. if not ok then
  282. return ok, err, nil
  283. end
  284. ok, err, t = do_socket(opts, header, body, extra)
  285. if not ok then
  286. return ok, err, nil
  287. end
  288. if opts.mode == consts.MODE_BLOCK then
  289. local extra_header = t[consts.TAG_EXTRA_HEADER]
  290. if extra_header then
  291. ngx.ctx.t1k_extra_header = extra_header
  292. end
  293. end
  294. local result = {
  295. action = t[consts.TAG_HEAD],
  296. status = t[consts.TAG_BODY],
  297. event_id = utils.get_event_id(t[consts.TAG_EXTRA_BODY]),
  298. }
  299. return true, nil, result
  300. end
  301. return _M