assemble.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package asm
  2. import (
  3. "bytes"
  4. "fmt"
  5. "hash"
  6. "hash/crc64"
  7. "io"
  8. "sync"
  9. "github.com/vbatts/tar-split/tar/storage"
  10. )
  11. // NewOutputTarStream returns an io.ReadCloser that is an assembled tar archive
  12. // stream.
  13. //
  14. // It takes a storage.FileGetter, for mapping the file payloads that are to be read in,
  15. // and a storage.Unpacker, which has access to the rawbytes and file order
  16. // metadata. With the combination of these two items, a precise assembled Tar
  17. // archive is possible.
  18. func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadCloser {
  19. // ... Since these are interfaces, this is possible, so let's not have a nil pointer
  20. if fg == nil || up == nil {
  21. return nil
  22. }
  23. pr, pw := io.Pipe()
  24. go func() {
  25. err := WriteOutputTarStream(fg, up, pw)
  26. if err != nil {
  27. pw.CloseWithError(err)
  28. } else {
  29. pw.Close()
  30. }
  31. }()
  32. return pr
  33. }
  34. // WriteOutputTarStream writes assembled tar archive to a writer.
  35. func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error {
  36. // ... Since these are interfaces, this is possible, so let's not have a nil pointer
  37. if fg == nil || up == nil {
  38. return nil
  39. }
  40. var copyBuffer []byte
  41. var crcHash hash.Hash
  42. var crcSum []byte
  43. var multiWriter io.Writer
  44. for {
  45. entry, err := up.Next()
  46. if err != nil {
  47. if err == io.EOF {
  48. return nil
  49. }
  50. return err
  51. }
  52. switch entry.Type {
  53. case storage.SegmentType:
  54. if _, err := w.Write(entry.Payload); err != nil {
  55. return err
  56. }
  57. case storage.FileType:
  58. if entry.Size == 0 {
  59. continue
  60. }
  61. fh, err := fg.Get(entry.GetName())
  62. if err != nil {
  63. return err
  64. }
  65. if crcHash == nil {
  66. crcHash = crc64.New(storage.CRCTable)
  67. crcSum = make([]byte, 8)
  68. multiWriter = io.MultiWriter(w, crcHash)
  69. copyBuffer = byteBufferPool.Get().([]byte)
  70. // TODO once we have some benchmark or memory profile then we can experiment with using *bytes.Buffer
  71. //nolint:staticcheck // SA6002 not going to do a pointer here
  72. defer byteBufferPool.Put(copyBuffer)
  73. } else {
  74. crcHash.Reset()
  75. }
  76. if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil {
  77. fh.Close()
  78. return err
  79. }
  80. if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) {
  81. // I would rather this be a comparable ErrInvalidChecksum or such,
  82. // but since it's coming through the PipeReader, the context of
  83. // _which_ file would be lost...
  84. fh.Close()
  85. return fmt.Errorf("file integrity checksum failed for %q", entry.GetName())
  86. }
  87. fh.Close()
  88. }
  89. }
  90. }
  91. var byteBufferPool = &sync.Pool{
  92. New: func() interface{} {
  93. return make([]byte, 32*1024)
  94. },
  95. }
  96. // copyWithBuffer is taken from stdlib io.Copy implementation
  97. // https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367
  98. func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  99. for {
  100. nr, er := src.Read(buf)
  101. if nr > 0 {
  102. nw, ew := dst.Write(buf[0:nr])
  103. if nw > 0 {
  104. written += int64(nw)
  105. }
  106. if ew != nil {
  107. err = ew
  108. break
  109. }
  110. if nr != nw {
  111. err = io.ErrShortWrite
  112. break
  113. }
  114. }
  115. if er == io.EOF {
  116. break
  117. }
  118. if er != nil {
  119. err = er
  120. break
  121. }
  122. }
  123. return written, err
  124. }