|
- local bit = require "bit"
- local buffer = require "resty.t1k.buffer"
- local consts = require "resty.t1k.constants"
- local file = require "resty.t1k.file"
- local log = require "resty.t1k.log"
- local utils = require "resty.t1k.utils"
- local uuid = require "resty.t1k.uuid"
- local _M = {
- _VERSION = '1.0.0',
- }
- local bor = bit.bor
- local byte = string.byte
- local char = string.char
- local fmt = string.format
- local sub = string.sub
- local concat = table.concat
- local ngx = ngx
- local nlog = ngx.log
- local ngx_req = ngx.req
- local ngx_socket = ngx.socket
- local ngx_var = ngx.var
- local warn_fmt = log.warn_fmt
- local debug_fmt = log.debug_fmt
- local KEY_EXTRA_UUID = "UUID"
- local KEY_EXTRA_LOCAL_ADDR = "LocalAddr"
- local KEY_EXTRA_LOCAL_PORT = "LocalPort"
- local KEY_EXTRA_REMOTE_ADDR = "RemoteAddr"
- local KEY_EXTRA_REMOTE_PORT = "RemotePort"
- local KEY_EXTRA_SCHEME = "Scheme"
- local KEY_EXTRA_SERVER_NAME = "ServerName"
- local KEY_EXTRA_PROXY_NAME = "ProxyName"
- local KEY_EXTRA_REQ_BEGIN_TIME = "ReqBeginTime"
- local KEY_EXTRA_HAS_RSP_IF_OK = "HasRspIfOK"
- local KEY_EXTRA_HAS_RSP_IF_BLOCK = "HasRspIfBlock"
- local TAG_HEAD_WITH_MASK_FIRST = bor(consts.TAG_HEAD, consts.MASK_FIRST)
- local TAG_EXTRA_WITH_MASK_LAST = bor(consts.TAG_EXTRA, consts.MASK_LAST)
- local T1K_PROTO = "Proto:2\n"
- local T1K_PROTO_DATA = fmt("%s%s%s", char(consts.TAG_VERSION), utils.int_to_char_length(#T1K_PROTO), T1K_PROTO)
- local function read_request_body(opt_req_body_size)
- local ok, err
- local req_body, req_body_size
- local content_length = tonumber(ngx_var.http_content_length) or 0
- local transfer_encoding = ngx_var.http_transfer_encoding
- if content_length == 0 and not transfer_encoding then
- return true, nil, nil
- end
- ngx_req.read_body()
- req_body = ngx_req.get_body_data()
- if req_body then
- req_body_size = #req_body
- if req_body_size > opt_req_body_size then
- nlog(debug_fmt("request body is too long: %d bytes, cut to %d bytes", req_body_size, opt_req_body_size))
- req_body = sub(req_body, 1, opt_req_body_size)
- end
- return true, nil, req_body
- end
- local path = ngx_req.get_body_file()
- if not path then
- return true, nil, nil
- end
- ok, err, req_body = file.read(path, opt_req_body_size)
- if not ok then
- err = fmt("failed to read temporary file %s: %s", path, err)
- return ok, err, nil
- end
- return true, nil, req_body
- end
- local function get_remote_addr(remote_addr_var, remote_addr_idx)
- local addr
- if remote_addr_var then
- addr = utils.get_indexed_element(ngx_var[remote_addr_var], remote_addr_idx)
- end
- return addr or ngx_var.remote_addr
- end
- local function parse_v(v)
- if type(v) == "table" then
- return concat(v, ", ")
- end
- return tostring(v)
- end
- local function build_header()
- local http_version = ngx_req.http_version()
- if http_version < 2.0 then
- return true, nil, ngx_req.raw_header()
- end
- local headers, err = ngx_req.get_headers(0, true)
- if err then
- err = fmt("failed to call ngx_req.get_headers: %s", err)
- return nil, err, nil
- end
- local buf = buffer:new()
- buf:add(fmt("%s %s HTTP/%.1f\r\n", ngx_req.get_method(), ngx_var.request_uri, http_version))
- for k, v in pairs(headers) do
- buf:add(fmt("%s: %s\r\n", k, parse_v(v)))
- end
- buf:add("\r\n")
- return true, nil, buf
- end
- local function build_body(opts)
- local ok, err
- local body
- local req_body_size = opts.req_body_size * 1024
- ok, err, body = read_request_body(req_body_size)
- if not ok then
- return ok, err, nil
- end
- return true, nil, body
- end
- local function build_extra(opts)
- local err
- local src_ip = get_remote_addr(opts.remote_addr_var, opts.remote_addr_idx)
- if not src_ip then
- err = fmt("failed to get remote_addr, var: %s, idx %d", opts.remote_addr_var, opts.remote_addr_idx)
- return nil, err
- end
- local src_port = ngx_var.remote_port
- if not src_port then
- err = "failed to get ngx_var.remote_port"
- return nil, err, nil
- end
- local local_ip = ngx_var.server_addr
- if not local_ip then
- err = "failed to get ngx_var.server_addr"
- return nil, err, nil
- end
- local local_port = ngx_var.server_port
- if not local_port then
- err = "failed to get ngx_var.server_port"
- return nil, err, nil
- end
- local extra = buffer:new({
- KEY_EXTRA_UUID, ":", uuid.generate_v4(), "\n",
- KEY_EXTRA_REMOTE_ADDR, ":", src_ip, "\n",
- KEY_EXTRA_REMOTE_PORT, ":", src_port, "\n",
- KEY_EXTRA_LOCAL_ADDR, ":", local_ip, "\n",
- KEY_EXTRA_LOCAL_PORT, ":", local_port, "\n",
- KEY_EXTRA_SCHEME, ":", ngx_var.scheme, "\n",
- KEY_EXTRA_SERVER_NAME, ":", ngx_var.server_name, "\n",
- KEY_EXTRA_PROXY_NAME, ":", ngx_var.hostname, "\n",
- KEY_EXTRA_REQ_BEGIN_TIME, ":", fmt("%.0f", ngx_req.start_time() * 1000000), "\n",
- KEY_EXTRA_HAS_RSP_IF_OK, ":n\n",
- KEY_EXTRA_HAS_RSP_IF_BLOCK, ":n\n"
- })
- return true, nil, extra
- end
- local function do_send(sock, data)
- local ok, err = sock:send(data)
- if not ok then
- return ok, err
- end
- return true, nil
- end
- local function receive_data(s, srv)
- local t = {}
- local ft = true
- local finished
- repeat
- local err
- local tag, length, packet, rsp_body
- packet, err = s:receive(consts.T1K_HEADER_SIZE)
- if err then
- err = fmt("failed to receive info packet from t1k server %s: %s", srv, err)
- return nil, err, nil
- end
- if not packet then
- err = fmt("empty packet from t1k server %s", srv)
- return nil, err, nil
- end
- if ft then
- if not utils.is_mask_first(byte(packet, 1, 1)) then
- err = fmt("first packet is not MASK_FIRST from t1k server %s", srv)
- return nil, err, nil
- end
- ft = false
- end
- finished, tag, length = utils.packet_parser(packet)
- if length > 0 then
- rsp_body, err = s:receive(length)
- if not rsp_body or #rsp_body ~= length then
- err = fmt("failed to receive payload from t1k server %s: %s", srv, err)
- return nil, err, nil
- end
- t[tag] = rsp_body
- end
- until (finished)
- return true, nil, t
- end
- local function get_socket(opts)
- local ok, err
- local count, sock, server
- sock, err = ngx_socket.tcp()
- if not sock then
- err = fmt("failed to create socket: %s", err)
- return nil, err, nil, nil
- end
- sock:settimeouts(opts.connect_timeout, opts.send_timeout, opts.read_timeout)
- if opts.uds then
- server = opts.host
- ok, err = sock:connect(opts.host)
- else
- server = fmt("%s:%d", opts.host, opts.port)
- ok, err = sock:connect(opts.host, opts.port)
- end
- if not ok then
- sock:close()
- err = fmt("failed to connect to t1k server %s: %s", server, err)
- return ok, err, nil, nil
- end
- nlog(debug_fmt("successfully connected to t1k server %s", server))
- count, err = sock:getreusedtimes()
- if not count then
- nlog(warn_fmt("failed to get reused times from t1k server %s: %s", server, err))
- end
- if count == 0 then
- ok, err = sock:setoption("keepalive", true)
- if not ok then
- nlog(warn_fmt("failed to set keepalive for t1k server %s: %s", server, err))
- end
- ok, err = sock:setoption("reuseaddr", true)
- if not ok then
- nlog(warn_fmt("failed to set reuseaddr for t1k server %s: %s", server, err))
- end
- ok, err = sock:setoption("tcp-nodelay", true)
- if not ok then
- nlog(warn_fmt("failed to set tcp-nodelay for t1k server %s: %s", server, err))
- end
- end
- return true, nil, sock, server
- end
- local function do_socket(opts, header, body, extra)
- local ok, err
- local t, sock, server
- ok, err, sock, server = get_socket(opts)
- if not ok then
- err = fmt("failed to get socket: %s", err)
- return ok, err, nil
- end
- ok, err = do_send(sock, { char(TAG_HEAD_WITH_MASK_FIRST), utils.int_to_char_length(header:len()), header })
- if not ok then
- sock:close()
- err = fmt("failed to send header data to t1k server %s: %s", server, err)
- return ok, err, nil
- end
- if body ~= nil then
- ok, err = do_send(sock, { char(consts.TAG_BODY), utils.int_to_char_length(body:len()), body })
- if not ok then
- sock:close()
- err = fmt("failed to send body data to t1k server %s: %s", server, err)
- return ok, err, nil
- end
- end
- ok, err = do_send(sock, { T1K_PROTO_DATA, char(TAG_EXTRA_WITH_MASK_LAST), utils.int_to_char_length(extra:len()), extra })
- if not ok then
- sock:close()
- err = fmt("failed to send extra data to t1k server %s: %s", server, err)
- return ok, err, nil
- end
- ok, err, t = receive_data(sock, server)
- if not ok then
- return ok, err, nil
- end
- ok, err = sock:setkeepalive(opts.keepalive_timeout, opts.keepalive_size)
- if not ok then
- nlog(warn_fmt("failed to set keepalive: %s", err))
- sock:close()
- end
- return true, nil, t
- end
- function _M.do_request(opts)
- local ok, err
- local header, body, extra, t
- ok, err, header = build_header(opts)
- if not ok then
- return ok, err, nil
- end
- ok, err, body = build_body(opts)
- if not ok then
- return ok, err, nil
- end
- ok, err, extra = build_extra(opts)
- if not ok then
- return ok, err, nil
- end
- ok, err, t = do_socket(opts, header, body, extra)
- if not ok then
- return ok, err, nil
- end
- if opts.mode == consts.MODE_BLOCK then
- local extra_header = t[consts.TAG_EXTRA_HEADER]
- if extra_header then
- ngx.ctx.t1k_extra_header = extra_header
- end
- end
- local result = {
- action = t[consts.TAG_HEAD],
- status = t[consts.TAG_BODY],
- event_id = utils.get_event_id(t[consts.TAG_EXTRA_BODY]),
- }
- return true, nil, result
- end
- return _M
|