2016-03-18 18:50:19 +00:00
package libcontainerd
import (
"fmt"
"io"
2016-03-23 22:41:47 +00:00
"io/ioutil"
"log"
2016-03-18 18:50:19 +00:00
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
2016-04-01 00:56:21 +00:00
"strings"
2016-03-18 18:50:19 +00:00
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
2016-03-29 16:43:12 +00:00
"github.com/docker/docker/pkg/locker"
2016-03-18 18:50:19 +00:00
sysinfo "github.com/docker/docker/pkg/system"
"github.com/docker/docker/utils"
2016-07-10 18:11:27 +00:00
"github.com/golang/protobuf/ptypes"
2016-07-11 15:55:39 +00:00
"github.com/golang/protobuf/ptypes/timestamp"
2016-03-18 18:50:19 +00:00
"golang.org/x/net/context"
"google.golang.org/grpc"
2016-03-23 22:41:47 +00:00
"google.golang.org/grpc/grpclog"
2016-04-15 04:06:26 +00:00
"google.golang.org/grpc/transport"
2016-03-18 18:50:19 +00:00
)
const (
maxConnectionRetryCount = 3
connectionRetryDelay = 3 * time . Second
containerdShutdownTimeout = 15 * time . Second
2016-03-23 00:55:47 +00:00
containerdBinary = "docker-containerd"
containerdPidFilename = "docker-containerd.pid"
containerdSockFilename = "docker-containerd.sock"
2016-05-27 21:45:43 +00:00
containerdStateDir = "containerd"
2016-03-18 18:50:19 +00:00
eventTimestampFilename = "event.ts"
)
type remote struct {
sync . RWMutex
2016-07-11 15:55:39 +00:00
apiClient containerd . APIClient
daemonPid int
stateDir string
rpcAddr string
startDaemon bool
closeManually bool
debugLog bool
rpcConn * grpc . ClientConn
clients [ ] * client
eventTsPath string
runtime string
runtimeArgs [ ] string
daemonWaitCh chan struct { }
liveRestore bool
oomScore int
restoreFromTimestamp * timestamp . Timestamp
2016-03-18 18:50:19 +00:00
}
// New creates a fresh instance of libcontainerd remote.
func New ( stateDir string , options ... RemoteOption ) ( _ Remote , err error ) {
defer func ( ) {
if err != nil {
err = fmt . Errorf ( "Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specificed the correct address. Got error: %v" , err )
}
} ( )
r := & remote {
stateDir : stateDir ,
daemonPid : - 1 ,
eventTsPath : filepath . Join ( stateDir , eventTimestampFilename ) ,
}
for _ , option := range options {
if err := option . Apply ( r ) ; err != nil {
return nil , err
}
}
if err := sysinfo . MkdirAll ( stateDir , 0700 ) ; err != nil {
return nil , err
}
if r . rpcAddr == "" {
r . rpcAddr = filepath . Join ( stateDir , containerdSockFilename )
}
if r . startDaemon {
if err := r . runContainerdDaemon ( ) ; err != nil {
return nil , err
}
}
2016-03-23 22:41:47 +00:00
// don't output the grpc reconnect logging
grpclog . SetLogger ( log . New ( ioutil . Discard , "" , log . LstdFlags ) )
2016-03-18 18:50:19 +00:00
dialOpts := append ( [ ] grpc . DialOption { grpc . WithInsecure ( ) } ,
grpc . WithDialer ( func ( addr string , timeout time . Duration ) ( net . Conn , error ) {
return net . DialTimeout ( "unix" , addr , timeout )
} ) ,
)
conn , err := grpc . Dial ( r . rpcAddr , dialOpts ... )
if err != nil {
return nil , fmt . Errorf ( "error connecting to containerd: %v" , err )
}
r . rpcConn = conn
r . apiClient = containerd . NewAPIClient ( conn )
2016-07-11 15:55:39 +00:00
// Get the timestamp to restore from
t := r . getLastEventTimestamp ( )
tsp , err := ptypes . TimestampProto ( t )
if err != nil {
logrus . Errorf ( "libcontainerd: failed to convert timestamp: %q" , err )
}
r . restoreFromTimestamp = tsp
2016-03-18 18:50:19 +00:00
go r . handleConnectionChange ( )
if err := r . startEventsMonitor ( ) ; err != nil {
return nil , err
}
return r , nil
}
2016-06-02 18:10:55 +00:00
func ( r * remote ) UpdateOptions ( options ... RemoteOption ) error {
for _ , option := range options {
if err := option . Apply ( r ) ; err != nil {
return err
}
}
return nil
}
2016-03-18 18:50:19 +00:00
func ( r * remote ) handleConnectionChange ( ) {
var transientFailureCount = 0
state := grpc . Idle
for {
s , err := r . rpcConn . WaitForStateChange ( context . Background ( ) , state )
if err != nil {
break
}
state = s
2016-07-22 22:20:14 +00:00
logrus . Debugf ( "libcontainerd: containerd connection state change: %v" , s )
2016-03-18 18:50:19 +00:00
if r . daemonPid != - 1 {
switch state {
case grpc . TransientFailure :
// Reset state to be notified of next failure
transientFailureCount ++
if transientFailureCount >= maxConnectionRetryCount {
transientFailureCount = 0
if utils . IsProcessAlive ( r . daemonPid ) {
utils . KillProcess ( r . daemonPid )
}
2016-07-13 17:04:42 +00:00
<- r . daemonWaitCh
2016-03-18 18:50:19 +00:00
if err := r . runContainerdDaemon ( ) ; err != nil { //FIXME: Handle error
2016-07-22 22:20:14 +00:00
logrus . Errorf ( "libcontainerd: error restarting containerd: %v" , err )
2016-03-18 18:50:19 +00:00
}
} else {
state = grpc . Idle
time . Sleep ( connectionRetryDelay )
}
case grpc . Shutdown :
// Well, we asked for it to stop, just return
return
}
}
}
}
func ( r * remote ) Cleanup ( ) {
if r . daemonPid == - 1 {
return
}
2016-04-15 04:06:26 +00:00
r . closeManually = true
2016-03-18 18:50:19 +00:00
r . rpcConn . Close ( )
// Ask the daemon to quit
syscall . Kill ( r . daemonPid , syscall . SIGTERM )
// Wait up to 15secs for it to stop
for i := time . Duration ( 0 ) ; i < containerdShutdownTimeout ; i += time . Second {
if ! utils . IsProcessAlive ( r . daemonPid ) {
break
}
time . Sleep ( time . Second )
}
if utils . IsProcessAlive ( r . daemonPid ) {
logrus . Warnf ( "libcontainerd: containerd (%d) didn't stop within 15 secs, killing it\n" , r . daemonPid )
syscall . Kill ( r . daemonPid , syscall . SIGKILL )
}
// cleanup some files
os . Remove ( filepath . Join ( r . stateDir , containerdPidFilename ) )
os . Remove ( filepath . Join ( r . stateDir , containerdSockFilename ) )
}
func ( r * remote ) Client ( b Backend ) ( Client , error ) {
c := & client {
clientCommon : clientCommon {
2016-03-29 16:43:12 +00:00
backend : b ,
containers : make ( map [ string ] * container ) ,
locker : locker . New ( ) ,
2016-03-18 18:50:19 +00:00
} ,
remote : r ,
exitNotifiers : make ( map [ string ] * exitNotifier ) ,
2016-06-02 18:10:55 +00:00
liveRestore : r . liveRestore ,
2016-03-18 18:50:19 +00:00
}
r . Lock ( )
r . clients = append ( r . clients , c )
r . Unlock ( )
return c , nil
}
func ( r * remote ) updateEventTimestamp ( t time . Time ) {
f , err := os . OpenFile ( r . eventTsPath , syscall . O_CREAT | syscall . O_WRONLY | syscall . O_TRUNC , 0600 )
defer f . Close ( )
if err != nil {
logrus . Warnf ( "libcontainerd: failed to open event timestamp file: %v" , err )
return
}
b , err := t . MarshalText ( )
if err != nil {
logrus . Warnf ( "libcontainerd: failed to encode timestamp: %v" , err )
return
}
n , err := f . Write ( b )
if err != nil || n != len ( b ) {
logrus . Warnf ( "libcontainerd: failed to update event timestamp file: %v" , err )
f . Truncate ( 0 )
return
}
}
2016-07-10 18:11:27 +00:00
func ( r * remote ) getLastEventTimestamp ( ) time . Time {
2016-03-18 18:50:19 +00:00
t := time . Now ( )
fi , err := os . Stat ( r . eventTsPath )
2016-03-29 00:33:09 +00:00
if os . IsNotExist ( err ) || fi . Size ( ) == 0 {
2016-07-10 18:11:27 +00:00
return t
2016-03-18 18:50:19 +00:00
}
f , err := os . Open ( r . eventTsPath )
defer f . Close ( )
if err != nil {
2016-06-11 17:42:38 +00:00
logrus . Warnf ( "libcontainerd: Unable to access last event ts: %v" , err )
2016-07-10 18:11:27 +00:00
return t
2016-03-18 18:50:19 +00:00
}
b := make ( [ ] byte , fi . Size ( ) )
n , err := f . Read ( b )
if err != nil || n != len ( b ) {
2016-06-11 17:42:38 +00:00
logrus . Warnf ( "libcontainerd: Unable to read last event ts: %v" , err )
2016-07-10 18:11:27 +00:00
return t
2016-03-18 18:50:19 +00:00
}
t . UnmarshalText ( b )
2016-07-10 18:11:27 +00:00
return t
2016-03-18 18:50:19 +00:00
}
func ( r * remote ) startEventsMonitor ( ) error {
// First, get past events
2016-07-11 15:55:39 +00:00
t := r . getLastEventTimestamp ( )
tsp , err := ptypes . TimestampProto ( t )
2016-07-10 18:11:27 +00:00
if err != nil {
logrus . Errorf ( "libcontainerd: failed to convert timestamp: %q" , err )
}
2016-03-18 18:50:19 +00:00
er := & containerd . EventsRequest {
2016-07-10 18:11:27 +00:00
Timestamp : tsp ,
2016-03-18 18:50:19 +00:00
}
events , err := r . apiClient . Events ( context . Background ( ) , er )
if err != nil {
return err
}
go r . handleEventStream ( events )
return nil
}
func ( r * remote ) handleEventStream ( events containerd . API_EventsClient ) {
for {
e , err := events . Recv ( )
if err != nil {
2016-04-15 04:06:26 +00:00
if grpc . ErrorDesc ( err ) == transport . ErrConnClosing . Desc &&
r . closeManually {
// ignore error if grpc remote connection is closed manually
return
}
2016-07-22 22:20:14 +00:00
logrus . Errorf ( "libcontainerd: failed to receive event from containerd: %v" , err )
2016-03-18 18:50:19 +00:00
go r . startEventsMonitor ( )
return
}
2016-07-22 22:20:14 +00:00
logrus . Debugf ( "libcontainerd: received containerd event: %#v" , e )
2016-03-18 18:50:19 +00:00
2016-07-10 18:11:27 +00:00
var container * container
var c * client
r . RLock ( )
for _ , c = range r . clients {
container , err = c . getContainer ( e . Id )
if err == nil {
break
2016-03-18 18:50:19 +00:00
}
2016-07-10 18:11:27 +00:00
}
r . RUnlock ( )
if container == nil {
2016-07-11 15:55:39 +00:00
logrus . Warnf ( "libcontainerd: unknown container %s" , e . Id )
2016-07-10 18:11:27 +00:00
continue
}
if err := container . handleEvent ( e ) ; err != nil {
logrus . Errorf ( "libcontainerd: error processing state change for %s: %v" , e . Id , err )
}
2016-03-18 18:50:19 +00:00
2016-07-10 18:11:27 +00:00
tsp , err := ptypes . Timestamp ( e . Timestamp )
if err != nil {
logrus . Errorf ( "libcontainerd: failed to convert event timestamp: %q" , err )
continue
2016-03-18 18:50:19 +00:00
}
2016-07-10 18:11:27 +00:00
r . updateEventTimestamp ( tsp )
2016-03-18 18:50:19 +00:00
}
}
func ( r * remote ) runContainerdDaemon ( ) error {
pidFilename := filepath . Join ( r . stateDir , containerdPidFilename )
f , err := os . OpenFile ( pidFilename , os . O_RDWR | os . O_CREATE , 0600 )
defer f . Close ( )
if err != nil {
return err
}
// File exist, check if the daemon is alive
b := make ( [ ] byte , 8 )
n , err := f . Read ( b )
if err != nil && err != io . EOF {
return err
}
if n > 0 {
pid , err := strconv . ParseUint ( string ( b [ : n ] ) , 10 , 64 )
if err != nil {
return err
}
if utils . IsProcessAlive ( int ( pid ) ) {
2016-07-22 22:20:14 +00:00
logrus . Infof ( "libcontainerd: previous instance of containerd still alive (%d)" , pid )
2016-03-18 18:50:19 +00:00
r . daemonPid = int ( pid )
return nil
}
}
// rewind the file
_ , err = f . Seek ( 0 , os . SEEK_SET )
if err != nil {
return err
}
// Truncate it
err = f . Truncate ( 0 )
if err != nil {
return err
}
// Start a new instance
2016-05-09 22:17:10 +00:00
args := [ ] string {
"-l" , fmt . Sprintf ( "unix://%s" , r . rpcAddr ) ,
"--shim" , "docker-containerd-shim" ,
"--metrics-interval=0" ,
2016-06-01 20:00:25 +00:00
"--start-timeout" , "2m" ,
2016-05-27 21:45:43 +00:00
"--state-dir" , filepath . Join ( r . stateDir , containerdStateDir ) ,
2016-05-09 22:17:10 +00:00
}
2016-05-23 21:49:50 +00:00
if r . runtime != "" {
args = append ( args , "--runtime" )
args = append ( args , r . runtime )
}
2016-03-18 18:50:19 +00:00
if r . debugLog {
2016-04-19 20:37:18 +00:00
args = append ( args , "--debug" )
2016-03-24 16:18:03 +00:00
}
if len ( r . runtimeArgs ) > 0 {
for _ , v := range r . runtimeArgs {
args = append ( args , "--runtime-args" )
args = append ( args , v )
}
2016-07-22 22:20:14 +00:00
logrus . Debugf ( "libcontainerd: runContainerdDaemon: runtimeArgs: %s" , args )
2016-03-18 18:50:19 +00:00
}
2016-04-01 00:56:21 +00:00
2016-03-18 18:50:19 +00:00
cmd := exec . Command ( containerdBinary , args ... )
2016-03-30 19:25:51 +00:00
// redirect containerd logs to docker logs
cmd . Stdout = os . Stdout
cmd . Stderr = os . Stderr
2016-05-02 18:23:38 +00:00
cmd . SysProcAttr = & syscall . SysProcAttr { Setsid : true , Pdeathsig : syscall . SIGKILL }
2016-04-01 00:56:21 +00:00
cmd . Env = nil
// clear the NOTIFY_SOCKET from the env when starting containerd
for _ , e := range os . Environ ( ) {
if ! strings . HasPrefix ( e , "NOTIFY_SOCKET" ) {
cmd . Env = append ( cmd . Env , e )
}
}
2016-03-18 18:50:19 +00:00
if err := cmd . Start ( ) ; err != nil {
return err
}
2016-07-22 22:20:14 +00:00
logrus . Infof ( "libcontainerd: new containerd process, pid: %d" , cmd . Process . Pid )
2016-07-11 22:26:23 +00:00
if err := setOOMScore ( cmd . Process . Pid , r . oomScore ) ; err != nil {
utils . KillProcess ( cmd . Process . Pid )
return err
}
2016-03-18 18:50:19 +00:00
if _ , err := f . WriteString ( fmt . Sprintf ( "%d" , cmd . Process . Pid ) ) ; err != nil {
utils . KillProcess ( cmd . Process . Pid )
return err
}
2016-06-01 00:47:39 +00:00
r . daemonWaitCh = make ( chan struct { } )
go func ( ) {
cmd . Wait ( )
close ( r . daemonWaitCh )
} ( ) // Reap our child when needed
2016-03-18 18:50:19 +00:00
r . daemonPid = cmd . Process . Pid
return nil
}
2016-07-11 22:26:23 +00:00
func setOOMScore ( pid , score int ) error {
f , err := os . OpenFile ( fmt . Sprintf ( "/proc/%d/oom_score_adj" , pid ) , os . O_WRONLY , 0 )
if err != nil {
return err
}
_ , err = f . WriteString ( strconv . Itoa ( score ) )
f . Close ( )
return err
}
2016-03-18 18:50:19 +00:00
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr ( addr string ) RemoteOption {
return rpcAddr ( addr )
}
type rpcAddr string
func ( a rpcAddr ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . rpcAddr = string ( a )
return nil
}
return fmt . Errorf ( "WithRemoteAddr option not supported for this remote" )
}
2016-05-23 21:49:50 +00:00
// WithRuntimePath sets the path of the runtime to be used as the
// default by containerd
func WithRuntimePath ( rt string ) RemoteOption {
return runtimePath ( rt )
}
type runtimePath string
func ( rt runtimePath ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . runtime = string ( rt )
return nil
}
return fmt . Errorf ( "WithRuntime option not supported for this remote" )
}
2016-03-24 16:18:03 +00:00
// WithRuntimeArgs sets the list of runtime args passed to containerd
func WithRuntimeArgs ( args [ ] string ) RemoteOption {
return runtimeArgs ( args )
}
type runtimeArgs [ ] string
func ( rt runtimeArgs ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . runtimeArgs = rt
return nil
}
return fmt . Errorf ( "WithRuntimeArgs option not supported for this remote" )
}
2016-03-18 18:50:19 +00:00
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
func WithStartDaemon ( start bool ) RemoteOption {
return startDaemon ( start )
}
type startDaemon bool
func ( s startDaemon ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . startDaemon = bool ( s )
return nil
}
return fmt . Errorf ( "WithStartDaemon option not supported for this remote" )
}
// WithDebugLog defines if containerd debug logs will be enabled for daemon.
func WithDebugLog ( debug bool ) RemoteOption {
return debugLog ( debug )
}
type debugLog bool
func ( d debugLog ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . debugLog = bool ( d )
return nil
}
return fmt . Errorf ( "WithDebugLog option not supported for this remote" )
}
2016-06-02 18:10:55 +00:00
// WithLiveRestore defines if containers are stopped on shutdown or restored.
func WithLiveRestore ( v bool ) RemoteOption {
return liveRestore ( v )
}
type liveRestore bool
func ( l liveRestore ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . liveRestore = bool ( l )
for _ , c := range remote . clients {
c . liveRestore = bool ( l )
}
return nil
}
return fmt . Errorf ( "WithLiveRestore option not supported for this remote" )
}
2016-07-11 22:26:23 +00:00
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore ( score int ) RemoteOption {
return oomScore ( score )
}
type oomScore int
func ( o oomScore ) Apply ( r Remote ) error {
if remote , ok := r . ( * remote ) ; ok {
remote . oomScore = int ( o )
return nil
}
return fmt . Errorf ( "WithOOMScore option not supported for this remote" )
}