tracer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package zk
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. )
  9. var (
  10. requests = make(map[int32]int32) // Map of Xid -> Opcode
  11. requestsLock = &sync.Mutex{}
  12. )
  13. func trace(conn1, conn2 net.Conn, client bool) {
  14. defer conn1.Close()
  15. defer conn2.Close()
  16. buf := make([]byte, 10*1024)
  17. init := true
  18. for {
  19. _, err := io.ReadFull(conn1, buf[:4])
  20. if err != nil {
  21. fmt.Println("1>", client, err)
  22. return
  23. }
  24. blen := int(binary.BigEndian.Uint32(buf[:4]))
  25. _, err = io.ReadFull(conn1, buf[4:4+blen])
  26. if err != nil {
  27. fmt.Println("2>", client, err)
  28. return
  29. }
  30. var cr interface{}
  31. opcode := int32(-1)
  32. readHeader := true
  33. if client {
  34. if init {
  35. cr = &connectRequest{}
  36. readHeader = false
  37. } else {
  38. xid := int32(binary.BigEndian.Uint32(buf[4:8]))
  39. opcode = int32(binary.BigEndian.Uint32(buf[8:12]))
  40. requestsLock.Lock()
  41. requests[xid] = opcode
  42. requestsLock.Unlock()
  43. cr = requestStructForOp(opcode)
  44. if cr == nil {
  45. fmt.Printf("Unknown opcode %d\n", opcode)
  46. }
  47. }
  48. } else {
  49. if init {
  50. cr = &connectResponse{}
  51. readHeader = false
  52. } else {
  53. xid := int32(binary.BigEndian.Uint32(buf[4:8]))
  54. zxid := int64(binary.BigEndian.Uint64(buf[8:16]))
  55. errnum := int32(binary.BigEndian.Uint32(buf[16:20]))
  56. if xid != -1 || zxid != -1 {
  57. requestsLock.Lock()
  58. found := false
  59. opcode, found = requests[xid]
  60. if !found {
  61. opcode = 0
  62. }
  63. delete(requests, xid)
  64. requestsLock.Unlock()
  65. } else {
  66. opcode = opWatcherEvent
  67. }
  68. cr = responseStructForOp(opcode)
  69. if cr == nil {
  70. fmt.Printf("Unknown opcode %d\n", opcode)
  71. }
  72. if errnum != 0 {
  73. cr = &struct{}{}
  74. }
  75. }
  76. }
  77. opname := "."
  78. if opcode != -1 {
  79. opname = opNames[opcode]
  80. }
  81. if cr == nil {
  82. fmt.Printf("%+v %s %+v\n", client, opname, buf[4:4+blen])
  83. } else {
  84. n := 4
  85. hdrStr := ""
  86. if readHeader {
  87. var hdr interface{}
  88. if client {
  89. hdr = &requestHeader{}
  90. } else {
  91. hdr = &responseHeader{}
  92. }
  93. if n2, err := decodePacket(buf[n:n+blen], hdr); err != nil {
  94. fmt.Println(err)
  95. } else {
  96. n += n2
  97. }
  98. hdrStr = fmt.Sprintf(" %+v", hdr)
  99. }
  100. if _, err := decodePacket(buf[n:n+blen], cr); err != nil {
  101. fmt.Println(err)
  102. }
  103. fmt.Printf("%+v %s%s %+v\n", client, opname, hdrStr, cr)
  104. }
  105. init = false
  106. written, err := conn2.Write(buf[:4+blen])
  107. if err != nil {
  108. fmt.Println("3>", client, err)
  109. return
  110. } else if written != 4+blen {
  111. fmt.Printf("Written != read: %d != %d\n", written, blen)
  112. return
  113. }
  114. }
  115. }
  116. func handleConnection(addr string, conn net.Conn) {
  117. zkConn, err := net.Dial("tcp", addr)
  118. if err != nil {
  119. fmt.Println(err)
  120. return
  121. }
  122. go trace(conn, zkConn, true)
  123. trace(zkConn, conn, false)
  124. }
  125. func StartTracer(listenAddr, serverAddr string) {
  126. ln, err := net.Listen("tcp", listenAddr)
  127. if err != nil {
  128. panic(err)
  129. }
  130. for {
  131. conn, err := ln.Accept()
  132. if err != nil {
  133. fmt.Println(err)
  134. continue
  135. }
  136. go handleConnection(serverAddr, conn)
  137. }
  138. }