requests.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package feed
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "encoding/xml"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "sync"
  11. "time"
  12. )
  13. const defaultClientTimeout = 5 * time.Second
  14. var defaultClient = &http.Client{
  15. Timeout: defaultClientTimeout,
  16. }
  17. var insecureClientTransport = &http.Transport{
  18. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  19. }
  20. var defaultInsecureClient = &http.Client{
  21. Timeout: defaultClientTimeout,
  22. Transport: insecureClientTransport,
  23. }
  24. type RequestDoer interface {
  25. Do(*http.Request) (*http.Response, error)
  26. }
  27. func addBrowserUserAgentHeader(request *http.Request) {
  28. request.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0")
  29. }
  30. func truncateString(s string, maxLen int) string {
  31. asRunes := []rune(s)
  32. if len(asRunes) > maxLen {
  33. return string(asRunes[:maxLen])
  34. }
  35. return s
  36. }
  37. func decodeJsonFromRequest[T any](client RequestDoer, request *http.Request) (T, error) {
  38. response, err := client.Do(request)
  39. var result T
  40. if err != nil {
  41. return result, err
  42. }
  43. defer response.Body.Close()
  44. body, err := io.ReadAll(response.Body)
  45. if err != nil {
  46. return result, err
  47. }
  48. if response.StatusCode != http.StatusOK {
  49. return result, fmt.Errorf(
  50. "unexpected status code %d for %s, response: %s",
  51. response.StatusCode,
  52. request.URL,
  53. truncateString(string(body), 256),
  54. )
  55. }
  56. err = json.Unmarshal(body, &result)
  57. if err != nil {
  58. return result, err
  59. }
  60. return result, nil
  61. }
  62. func decodeJsonFromRequestTask[T any](client RequestDoer) func(*http.Request) (T, error) {
  63. return func(request *http.Request) (T, error) {
  64. return decodeJsonFromRequest[T](client, request)
  65. }
  66. }
  67. // TODO: tidy up, these are a copy of the above but with a line changed
  68. func decodeXmlFromRequest[T any](client RequestDoer, request *http.Request) (T, error) {
  69. response, err := client.Do(request)
  70. var result T
  71. if err != nil {
  72. return result, err
  73. }
  74. defer response.Body.Close()
  75. body, err := io.ReadAll(response.Body)
  76. if err != nil {
  77. return result, err
  78. }
  79. if response.StatusCode != http.StatusOK {
  80. return result, fmt.Errorf(
  81. "unexpected status code %d for %s, response: %s",
  82. response.StatusCode,
  83. request.URL,
  84. truncateString(string(body), 256),
  85. )
  86. }
  87. err = xml.Unmarshal(body, &result)
  88. if err != nil {
  89. return result, err
  90. }
  91. return result, nil
  92. }
  93. func decodeXmlFromRequestTask[T any](client RequestDoer) func(*http.Request) (T, error) {
  94. return func(request *http.Request) (T, error) {
  95. return decodeXmlFromRequest[T](client, request)
  96. }
  97. }
  98. type workerPoolTask[I any, O any] struct {
  99. index int
  100. input I
  101. output O
  102. err error
  103. }
  104. type workerPoolJob[I any, O any] struct {
  105. data []I
  106. workers int
  107. task func(I) (O, error)
  108. ctx context.Context
  109. }
  110. const defaultNumWorkers = 10
  111. func (job *workerPoolJob[I, O]) withWorkers(workers int) *workerPoolJob[I, O] {
  112. if workers == 0 {
  113. job.workers = defaultNumWorkers
  114. } else if workers > len(job.data) {
  115. job.workers = len(job.data)
  116. } else {
  117. job.workers = workers
  118. }
  119. return job
  120. }
  121. // func (job *workerPoolJob[I, O]) withContext(ctx context.Context) *workerPoolJob[I, O] {
  122. // if ctx != nil {
  123. // job.ctx = ctx
  124. // }
  125. // return job
  126. // }
  127. func newJob[I any, O any](task func(I) (O, error), data []I) *workerPoolJob[I, O] {
  128. return &workerPoolJob[I, O]{
  129. workers: defaultNumWorkers,
  130. task: task,
  131. data: data,
  132. ctx: context.Background(),
  133. }
  134. }
  135. func workerPoolDo[I any, O any](job *workerPoolJob[I, O]) ([]O, []error, error) {
  136. results := make([]O, len(job.data))
  137. errs := make([]error, len(job.data))
  138. if len(job.data) == 0 {
  139. return results, errs, nil
  140. }
  141. tasksQueue := make(chan *workerPoolTask[I, O])
  142. resultsQueue := make(chan *workerPoolTask[I, O])
  143. var wg sync.WaitGroup
  144. for range job.workers {
  145. wg.Add(1)
  146. go func() {
  147. defer wg.Done()
  148. for t := range tasksQueue {
  149. t.output, t.err = job.task(t.input)
  150. resultsQueue <- t
  151. }
  152. }()
  153. }
  154. var err error
  155. go func() {
  156. loop:
  157. for i := range job.data {
  158. select {
  159. default:
  160. tasksQueue <- &workerPoolTask[I, O]{
  161. index: i,
  162. input: job.data[i],
  163. }
  164. case <-job.ctx.Done():
  165. err = job.ctx.Err()
  166. break loop
  167. }
  168. }
  169. close(tasksQueue)
  170. wg.Wait()
  171. close(resultsQueue)
  172. }()
  173. for task := range resultsQueue {
  174. errs[task.index] = task.err
  175. results[task.index] = task.output
  176. }
  177. return results, errs, err
  178. }