|
@@ -3,8 +3,10 @@ package asm
|
|
|
import (
|
|
|
"bytes"
|
|
|
"fmt"
|
|
|
+ "hash"
|
|
|
"hash/crc64"
|
|
|
"io"
|
|
|
+ "sync"
|
|
|
|
|
|
"github.com/vbatts/tar-split/tar/storage"
|
|
|
)
|
|
@@ -23,45 +25,106 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose
|
|
|
}
|
|
|
pr, pw := io.Pipe()
|
|
|
go func() {
|
|
|
- for {
|
|
|
- entry, err := up.Next()
|
|
|
+ err := WriteOutputTarStream(fg, up, pw)
|
|
|
+ if err != nil {
|
|
|
+ pw.CloseWithError(err)
|
|
|
+ } else {
|
|
|
+ pw.Close()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return pr
|
|
|
+}
|
|
|
+
|
|
|
+// WriteOutputTarStream writes assembled tar archive to a writer.
|
|
|
+func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error {
|
|
|
+ // ... Since these are interfaces, this is possible, so let's not have a nil pointer
|
|
|
+ if fg == nil || up == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ var copyBuffer []byte
|
|
|
+ var crcHash hash.Hash
|
|
|
+ var crcSum []byte
|
|
|
+ var multiWriter io.Writer
|
|
|
+ for {
|
|
|
+ entry, err := up.Next()
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ switch entry.Type {
|
|
|
+ case storage.SegmentType:
|
|
|
+ if _, err := w.Write(entry.Payload); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ case storage.FileType:
|
|
|
+ if entry.Size == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ fh, err := fg.Get(entry.GetName())
|
|
|
if err != nil {
|
|
|
- pw.CloseWithError(err)
|
|
|
- return
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if crcHash == nil {
|
|
|
+ crcHash = crc64.New(storage.CRCTable)
|
|
|
+ crcSum = make([]byte, 8)
|
|
|
+ multiWriter = io.MultiWriter(w, crcHash)
|
|
|
+ copyBuffer = byteBufferPool.Get().([]byte)
|
|
|
+ defer byteBufferPool.Put(copyBuffer)
|
|
|
+ } else {
|
|
|
+ crcHash.Reset()
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil {
|
|
|
+ fh.Close()
|
|
|
+ return err
|
|
|
}
|
|
|
- switch entry.Type {
|
|
|
- case storage.SegmentType:
|
|
|
- if _, err := pw.Write(entry.Payload); err != nil {
|
|
|
- pw.CloseWithError(err)
|
|
|
- return
|
|
|
- }
|
|
|
- case storage.FileType:
|
|
|
- if entry.Size == 0 {
|
|
|
- continue
|
|
|
- }
|
|
|
- fh, err := fg.Get(entry.GetName())
|
|
|
- if err != nil {
|
|
|
- pw.CloseWithError(err)
|
|
|
- return
|
|
|
- }
|
|
|
- c := crc64.New(storage.CRCTable)
|
|
|
- tRdr := io.TeeReader(fh, c)
|
|
|
- if _, err := io.Copy(pw, tRdr); err != nil {
|
|
|
- fh.Close()
|
|
|
- pw.CloseWithError(err)
|
|
|
- return
|
|
|
- }
|
|
|
- if !bytes.Equal(c.Sum(nil), entry.Payload) {
|
|
|
- // I would rather this be a comparable ErrInvalidChecksum or such,
|
|
|
- // but since it's coming through the PipeReader, the context of
|
|
|
- // _which_ file would be lost...
|
|
|
- fh.Close()
|
|
|
- pw.CloseWithError(fmt.Errorf("file integrity checksum failed for %q", entry.GetName()))
|
|
|
- return
|
|
|
- }
|
|
|
+
|
|
|
+ if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) {
|
|
|
+ // I would rather this be a comparable ErrInvalidChecksum or such,
|
|
|
+ // but since it's coming through the PipeReader, the context of
|
|
|
+ // _which_ file would be lost...
|
|
|
fh.Close()
|
|
|
+ return fmt.Errorf("file integrity checksum failed for %q", entry.GetName())
|
|
|
}
|
|
|
+ fh.Close()
|
|
|
}
|
|
|
- }()
|
|
|
- return pr
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+var byteBufferPool = &sync.Pool{
|
|
|
+ New: func() interface{} {
|
|
|
+ return make([]byte, 32*1024)
|
|
|
+ },
|
|
|
+}
|
|
|
+
|
|
|
+// copyWithBuffer is taken from stdlib io.Copy implementation
|
|
|
+// https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367
|
|
|
+func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
|
|
|
+ for {
|
|
|
+ nr, er := src.Read(buf)
|
|
|
+ if nr > 0 {
|
|
|
+ nw, ew := dst.Write(buf[0:nr])
|
|
|
+ if nw > 0 {
|
|
|
+ written += int64(nw)
|
|
|
+ }
|
|
|
+ if ew != nil {
|
|
|
+ err = ew
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if nr != nw {
|
|
|
+ err = io.ErrShortWrite
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if er == io.EOF {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return written, err
|
|
|
}
|