sharedtemp.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "io"
  4. "io/fs"
  5. "os"
  6. "runtime"
  7. )
  8. type fileConvertFn func(dst io.WriteSeeker, src io.ReadSeeker) error
  9. type stfID uint64
  10. // sharedTempFileConverter converts files using a user-supplied function and
  11. // writes the results to temporary files which are automatically cleaned up on
  12. // close. If another request is made to convert the same file, the conversion
  13. // result and temporary file are reused if they have not yet been cleaned up.
  14. //
  15. // A file is considered the same as another file using the os.SameFile function,
  16. // which compares file identity (e.g. device and inode numbers on Linux) and is
  17. // robust to file renames. Input files are assumed to be immutable; no attempt
  18. // is made to ascertain whether the file contents have changed between requests.
  19. //
  20. // One file descriptor is used per source file, irrespective of the number of
  21. // concurrent readers of the converted contents.
  22. type sharedTempFileConverter struct {
  23. // The directory where temporary converted files are to be written to.
  24. // If set to the empty string, the default directory for temporary files
  25. // is used.
  26. TempDir string
  27. conv fileConvertFn
  28. st chan stfcState
  29. }
  30. type stfcState struct {
  31. fl map[stfID]sharedTempFile
  32. nextID stfID
  33. }
  34. type sharedTempFile struct {
  35. src os.FileInfo // Info about the source file for path-independent identification with os.SameFile.
  36. fd *os.File
  37. size int64
  38. ref int // Reference count of open readers on the temporary file.
  39. wait []chan<- stfConvertResult // Wait list for the conversion to complete.
  40. }
  41. type stfConvertResult struct {
  42. fr *sharedFileReader
  43. err error
  44. }
  45. func newSharedTempFileConverter(conv fileConvertFn) *sharedTempFileConverter {
  46. st := make(chan stfcState, 1)
  47. st <- stfcState{fl: make(map[stfID]sharedTempFile)}
  48. return &sharedTempFileConverter{conv: conv, st: st}
  49. }
  50. // Do returns a reader for the contents of f as converted by the c.C function.
  51. // It is the caller's responsibility to close the returned reader.
  52. //
  53. // This function is safe for concurrent use by multiple goroutines.
  54. func (c *sharedTempFileConverter) Do(f *os.File) (*sharedFileReader, error) {
  55. stat, err := f.Stat()
  56. if err != nil {
  57. return nil, err
  58. }
  59. st := <-c.st
  60. for id, tf := range st.fl {
  61. // os.SameFile can have false positives if one of the files was
  62. // deleted before the other file was created -- such as during
  63. // log rotations... https://github.com/golang/go/issues/36895
  64. // Weed out those false positives by also comparing the files'
  65. // ModTime, which conveniently also handles the case of true
  66. // positives where the file has also been modified since it was
  67. // first converted.
  68. if os.SameFile(tf.src, stat) && tf.src.ModTime() == stat.ModTime() {
  69. return c.openExisting(st, id, tf)
  70. }
  71. }
  72. return c.openNew(st, f, stat)
  73. }
  74. func (c *sharedTempFileConverter) openNew(st stfcState, f *os.File, stat os.FileInfo) (*sharedFileReader, error) {
  75. // Record that we are starting to convert this file so that any other
  76. // requests for the same source file while the conversion is in progress
  77. // can join.
  78. id := st.nextID
  79. st.nextID++
  80. st.fl[id] = sharedTempFile{src: stat}
  81. c.st <- st
  82. dst, size, convErr := c.convert(f)
  83. st = <-c.st
  84. flid := st.fl[id]
  85. if convErr != nil {
  86. // Conversion failed. Delete it from the state so that future
  87. // requests to convert the same file can try again fresh.
  88. delete(st.fl, id)
  89. c.st <- st
  90. for _, w := range flid.wait {
  91. w <- stfConvertResult{err: convErr}
  92. }
  93. return nil, convErr
  94. }
  95. flid.fd = dst
  96. flid.size = size
  97. flid.ref = len(flid.wait) + 1
  98. for _, w := range flid.wait {
  99. // Each waiter needs its own reader with an independent read pointer.
  100. w <- stfConvertResult{fr: flid.Reader(c, id)}
  101. }
  102. flid.wait = nil
  103. st.fl[id] = flid
  104. c.st <- st
  105. return flid.Reader(c, id), nil
  106. }
  107. func (c *sharedTempFileConverter) openExisting(st stfcState, id stfID, v sharedTempFile) (*sharedFileReader, error) {
  108. if v.fd != nil {
  109. // Already converted.
  110. v.ref++
  111. st.fl[id] = v
  112. c.st <- st
  113. return v.Reader(c, id), nil
  114. }
  115. // The file has not finished being converted.
  116. // Add ourselves to the wait list. "Don't call us; we'll call you."
  117. wait := make(chan stfConvertResult, 1)
  118. v.wait = append(v.wait, wait)
  119. st.fl[id] = v
  120. c.st <- st
  121. res := <-wait
  122. return res.fr, res.err
  123. }
  124. func (c *sharedTempFileConverter) convert(f *os.File) (converted *os.File, size int64, err error) {
  125. dst, err := os.CreateTemp(c.TempDir, "dockerdtemp.*")
  126. if err != nil {
  127. return nil, 0, err
  128. }
  129. defer func() {
  130. _ = dst.Close()
  131. // Delete the temporary file immediately so that final cleanup
  132. // of the file on disk is deferred to the OS once we close all
  133. // our file descriptors (or the process dies). Assuming no early
  134. // returns due to errors, the file will be open by this process
  135. // with a read-only descriptor at this point. As we don't care
  136. // about being able to reuse the file name -- it's randomly
  137. // generated and unique -- we can safely use os.Remove on
  138. // Windows.
  139. _ = os.Remove(dst.Name())
  140. }()
  141. err = c.conv(dst, f)
  142. if err != nil {
  143. return nil, 0, err
  144. }
  145. // Close the exclusive read-write file descriptor, catching any delayed
  146. // write errors (and on Windows, releasing the share-locks on the file)
  147. if err := dst.Close(); err != nil {
  148. _ = os.Remove(dst.Name())
  149. return nil, 0, err
  150. }
  151. // Open the file again read-only (without locking the file against
  152. // deletion on Windows).
  153. converted, err = open(dst.Name())
  154. if err != nil {
  155. return nil, 0, err
  156. }
  157. // The position of the file's read pointer doesn't matter as all readers
  158. // will be accessing the file through its io.ReaderAt interface.
  159. size, err = converted.Seek(0, io.SeekEnd)
  160. if err != nil {
  161. _ = converted.Close()
  162. return nil, 0, err
  163. }
  164. return converted, size, nil
  165. }
  166. type sharedFileReader struct {
  167. *io.SectionReader
  168. c *sharedTempFileConverter
  169. id stfID
  170. closed bool
  171. }
  172. func (stf sharedTempFile) Reader(c *sharedTempFileConverter, id stfID) *sharedFileReader {
  173. rdr := &sharedFileReader{SectionReader: io.NewSectionReader(stf.fd, 0, stf.size), c: c, id: id}
  174. runtime.SetFinalizer(rdr, (*sharedFileReader).Close)
  175. return rdr
  176. }
  177. func (r *sharedFileReader) Close() error {
  178. if r.closed {
  179. return fs.ErrClosed
  180. }
  181. st := <-r.c.st
  182. flid, ok := st.fl[r.id]
  183. if !ok {
  184. panic("invariant violation: temp file state missing from map")
  185. }
  186. flid.ref--
  187. lastRef := flid.ref <= 0
  188. if lastRef {
  189. delete(st.fl, r.id)
  190. } else {
  191. st.fl[r.id] = flid
  192. }
  193. r.closed = true
  194. r.c.st <- st
  195. if lastRef {
  196. return flid.fd.Close()
  197. }
  198. runtime.SetFinalizer(r, nil)
  199. return nil
  200. }