Refactor stdin closing

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 6f2658fb8c)
This commit is contained in:
Tonis Tiigi 2016-10-18 11:18:19 -07:00
parent c17155fbc3
commit 7e98d12157
3 changed files with 60 additions and 22 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/mount"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
@ -96,12 +97,26 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
return err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
if err2 := p.sendCloseStdin(); err == nil {
err = err2
}
})
return err
})
container.processes[processFriendlyName] = p
clnt.unlock(containerID)
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
p.closeFifos(iopipe)
return err
}
clnt.lock(containerID)
@ -421,8 +436,18 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
if err != nil {
return err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
})
return err
})
if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
container.closeFifos(iopipe)
return err
}
@ -435,6 +460,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
}})
if err != nil {
container.closeFifos(iopipe)
return err
}

View file

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"time"
@ -93,22 +94,37 @@ func (ctr *container) start() error {
if err != nil {
return nil
}
createChan := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready := make(chan struct{})
iopipe, err := ctr.openFifos(spec.Process.Terminal)
if err != nil {
return err
}
var stdinOnce sync.Once
// we need to delay stdin closure after container start or else "stdin close"
// event will be rejected by containerd.
// stdin closure happens in AttachStreams
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
go func() {
<-createChan
stdin.Close()
}()
return nil
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
go func() {
select {
case <-ready:
if err := ctr.sendCloseStdin(); err != nil {
logrus.Warnf("failed to close stdin: %+v")
}
case <-ctx.Done():
}
}()
})
return err
})
r := &containerd.CreateContainerRequest{
@ -125,20 +141,18 @@ func (ctr *container) start() error {
ctr.client.appendContainer(ctr)
if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
close(createChan)
ctr.closeFifos(iopipe)
return err
}
resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r)
if err != nil {
close(createChan)
ctr.closeFifos(iopipe)
return err
}
ctr.startedAt = time.Now()
ctr.systemPid = systemPid(resp.Container)
close(createChan)
close(ready)
return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{
CommonStateInfo: CommonStateInfo{

View file

@ -9,7 +9,6 @@ import (
"time"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
@ -37,14 +36,14 @@ func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) {
io := &IOPipe{}
stdin, err := fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
io.Stdin, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
stdin.Close()
io.Stdin.Close()
}
}()
@ -73,19 +72,18 @@ func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) {
io.Stderr = ioutil.NopCloser(emptyReader{})
}
io.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
stdin.Close()
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
})
return io, nil
}
func (p *process) sendCloseStdin() error {
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
}
func (p *process) closeFifos(io *IOPipe) {
io.Stdin.Close()
io.Stdout.Close()