Update containerd to 6bff39c643886dfa3d546e83a90a5

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-11-30 11:11:33 -05:00
parent aa3ce07c41
commit c2cb302d17
21 changed files with 653 additions and 346 deletions

View file

@ -4,7 +4,7 @@ TOMLV_COMMIT=9baf8a8a9f2ed20a8e54160840c492f937eeaf9a
# When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
RUNC_COMMIT=b2567b37d7b75eb4cf325b77297b140ea686ce8f
CONTAINERD_COMMIT=59bd1967112885c4d49e510e5570182865ba6fdd
CONTAINERD_COMMIT=6bff39c643886dfa3d546e83a90a527b64ddeacf
TINI_COMMIT=949e6facb77383876aeff8a6944dde66b3089574
LIBNETWORK_COMMIT=7b2b1feb1de4817d522cc372af149ff48d25028e
VNDR_COMMIT=a6e196d8b4b0cbbdc29aebdb20c59ac6926bb384

View file

@ -103,15 +103,15 @@ github.com/googleapis/gax-go da06d194a00e19ce00d9011a13931c3f6f6887c7
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
# containerd
github.com/containerd/containerd 59bd1967112885c4d49e510e5570182865ba6fdd
github.com/containerd/containerd 6bff39c643886dfa3d546e83a90a527b64ddeacf
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/continuity 35d55c5e8dd23b32037d56cf97174aff3efdfa83
github.com/containerd/cgroups f7dd103d3e4e696aa67152f6b4ddd1779a3455a9
github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f
github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/containerd/go-runc ed1cbe1fc31f5fb2359d3a54b6330d1a097858b7
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc bdb2ab7a8169e485e39421e666e15a505e575fd2
github.com/stevvooe/ttrpc 8c92e22ce0c492875ccaac3ab06143a77d8ed0c1
# cluster
github.com/docker/swarmkit de950a7ed842c7b7e47e9451cde9bf8f96031894

View file

@ -310,7 +310,8 @@ func (c *cgroup) Thaw() error {
}
// OOMEventFD returns the memory cgroup's out of memory event fd that triggers
// when processes inside the cgroup receive an oom event
// when processes inside the cgroup receive an oom event. Returns
// ErrMemoryNotSupported if memory cgroups is not supported.
func (c *cgroup) OOMEventFD() (uintptr, error) {
c.mu.Lock()
defer c.mu.Unlock()

View file

@ -1,11 +1,11 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: github.com/containerd/containerd/api/services/snapshot/v1/snapshots.proto
// source: github.com/containerd/containerd/api/services/snapshots/v1/snapshots.proto
/*
Package snapshot is a generated protocol buffer package.
Package snapshots is a generated protocol buffer package.
It is generated from these files:
github.com/containerd/containerd/api/services/snapshot/v1/snapshots.proto
github.com/containerd/containerd/api/services/snapshots/v1/snapshots.proto
It has these top-level messages:
PrepareSnapshotRequest
@ -26,7 +26,7 @@
UsageRequest
UsageResponse
*/
package snapshot
package snapshots
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
@ -651,7 +651,7 @@ var _Snapshots_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
},
Metadata: "github.com/containerd/containerd/api/services/snapshot/v1/snapshots.proto",
Metadata: "github.com/containerd/containerd/api/services/snapshots/v1/snapshots.proto",
}
func (m *PrepareSnapshotRequest) Marshal() (dAtA []byte, err error) {
@ -4194,72 +4194,72 @@ var (
)
func init() {
proto.RegisterFile("github.com/containerd/containerd/api/services/snapshot/v1/snapshots.proto", fileDescriptorSnapshots)
proto.RegisterFile("github.com/containerd/containerd/api/services/snapshots/v1/snapshots.proto", fileDescriptorSnapshots)
}
var fileDescriptorSnapshots = []byte{
// 1006 bytes of a gzipped FileDescriptorProto
// 1007 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x6f, 0x1a, 0x47,
0x14, 0x67, 0x60, 0x8d, 0xe3, 0x87, 0xed, 0xd2, 0x09, 0x26, 0x68, 0x5b, 0xe1, 0x15, 0x87, 0xca,
0xea, 0x61, 0x37, 0xa1, 0x6a, 0xe2, 0xc4, 0x97, 0x02, 0xa1, 0x15, 0x71, 0xec, 0x54, 0x1b, 0xdb,
0x89, 0xd3, 0x48, 0xd1, 0x1a, 0xc6, 0x78, 0x05, 0xbb, 0x4b, 0x99, 0x81, 0x88, 0x56, 0xaa, 0x7a,
0x8c, 0x7c, 0xea, 0x17, 0xf0, 0xa9, 0xfd, 0x10, 0x55, 0x3f, 0x81, 0x8f, 0x3d, 0xf6, 0xd4, 0x36,
0xfe, 0x12, 0x3d, 0xf5, 0x8f, 0x66, 0x76, 0x16, 0x30, 0xa6, 0x62, 0xc1, 0xe4, 0xf6, 0x76, 0x66,
0xde, 0x7b, 0xbf, 0xf7, 0x7b, 0xf3, 0xde, 0x9b, 0x85, 0x4a, 0xdd, 0x66, 0x27, 0x9d, 0x23, 0xbd,
0xea, 0x39, 0x46, 0xd5, 0x73, 0x99, 0x65, 0xbb, 0xa4, 0x5d, 0x1b, 0x16, 0xad, 0x96, 0x6d, 0x50,
0xd2, 0xee, 0xda, 0x55, 0x42, 0x0d, 0xea, 0x5a, 0x2d, 0x7a, 0xe2, 0x31, 0xa3, 0x7b, 0xa7, 0x2f,
0x53, 0xbd, 0xd5, 0xf6, 0x98, 0x87, 0xb5, 0x81, 0x92, 0x1e, 0x28, 0xe8, 0x83, 0x43, 0xdd, 0x3b,
0x6a, 0xaa, 0xee, 0xd5, 0x3d, 0x71, 0xd8, 0xe0, 0x92, 0xaf, 0xa7, 0x7e, 0x50, 0xf7, 0xbc, 0x7a,
0x93, 0x18, 0xe2, 0xeb, 0xa8, 0x73, 0x6c, 0x10, 0xa7, 0xc5, 0x7a, 0x72, 0x53, 0x1b, 0xdd, 0x3c,
0xb6, 0x49, 0xb3, 0xf6, 0xca, 0xb1, 0x68, 0x43, 0x9e, 0x58, 0x1f, 0x3d, 0xc1, 0x6c, 0x87, 0x50,
0x66, 0x39, 0x2d, 0x79, 0xe0, 0x6e, 0xa8, 0x10, 0x59, 0xaf, 0x45, 0xa8, 0xe1, 0x78, 0x1d, 0x97,
0xf9, 0x7a, 0xb9, 0xbf, 0x11, 0xa4, 0xbf, 0x6c, 0x93, 0x96, 0xd5, 0x26, 0x4f, 0x65, 0x14, 0x26,
0xf9, 0xba, 0x43, 0x28, 0xc3, 0x1a, 0x24, 0x82, 0xc0, 0x18, 0x69, 0x67, 0x90, 0x86, 0x36, 0x96,
0xcc, 0xe1, 0x25, 0x9c, 0x84, 0x58, 0x83, 0xf4, 0x32, 0x51, 0xb1, 0xc3, 0x45, 0x9c, 0x86, 0x38,
0x37, 0xe5, 0xb2, 0x4c, 0x4c, 0x2c, 0xca, 0x2f, 0xfc, 0x12, 0xe2, 0x4d, 0xeb, 0x88, 0x34, 0x69,
0x46, 0xd1, 0x62, 0x1b, 0x89, 0xfc, 0x43, 0x7d, 0x12, 0x8f, 0xfa, 0x78, 0x54, 0xfa, 0x63, 0x61,
0xa6, 0xec, 0xb2, 0x76, 0xcf, 0x94, 0x36, 0xd5, 0xfb, 0x90, 0x18, 0x5a, 0x0e, 0x60, 0xa1, 0x01,
0xac, 0x14, 0x2c, 0x74, 0xad, 0x66, 0x87, 0x48, 0xa8, 0xfe, 0xc7, 0x83, 0xe8, 0x26, 0xca, 0x3d,
0x82, 0x5b, 0x57, 0x1c, 0xd1, 0x96, 0xe7, 0x52, 0x82, 0x0d, 0x88, 0x0b, 0xa6, 0x68, 0x06, 0x09,
0xcc, 0xb7, 0x86, 0x31, 0x0b, 0x26, 0xf5, 0x1d, 0xbe, 0x6f, 0xca, 0x63, 0xb9, 0xbf, 0x10, 0xdc,
0x3c, 0xb0, 0xc9, 0xeb, 0x77, 0x49, 0xe4, 0xe1, 0x08, 0x91, 0x85, 0xc9, 0x44, 0x8e, 0x81, 0x34,
0x6f, 0x16, 0xbf, 0x80, 0xd4, 0x65, 0x2f, 0xb3, 0x52, 0x58, 0x82, 0x15, 0xb1, 0x40, 0xaf, 0xc1,
0x5d, 0xae, 0x00, 0xab, 0x81, 0x91, 0x59, 0x71, 0x6c, 0xc3, 0x9a, 0x49, 0x1c, 0xaf, 0x3b, 0x8f,
0xa2, 0xe0, 0xf7, 0x62, 0xad, 0xe4, 0x39, 0x8e, 0xcd, 0xa6, 0xb7, 0x86, 0x41, 0x71, 0x2d, 0x27,
0xa0, 0x5c, 0xc8, 0x81, 0x87, 0xd8, 0x20, 0x33, 0x5f, 0x8d, 0xdc, 0x8a, 0xd2, 0xe4, 0x5b, 0x31,
0x16, 0xd0, 0xbc, 0xef, 0x45, 0x05, 0x6e, 0x3e, 0x65, 0x16, 0x9b, 0x07, 0x89, 0xff, 0x46, 0x41,
0xa9, 0xb8, 0xc7, 0x5e, 0x9f, 0x11, 0x34, 0xc4, 0xc8, 0xa0, 0x5a, 0xa2, 0x97, 0xaa, 0xe5, 0x01,
0x28, 0x0d, 0xdb, 0xad, 0x09, 0xaa, 0x56, 0xf3, 0x1f, 0x4d, 0x66, 0x65, 0xdb, 0x76, 0x6b, 0xa6,
0xd0, 0xc1, 0x25, 0x80, 0x6a, 0x9b, 0x58, 0x8c, 0xd4, 0x5e, 0x59, 0x2c, 0xa3, 0x68, 0x68, 0x23,
0x91, 0x57, 0x75, 0xbf, 0x0f, 0xeb, 0x41, 0x1f, 0xd6, 0xf7, 0x82, 0x3e, 0x5c, 0xbc, 0x71, 0xfe,
0xfb, 0x7a, 0xe4, 0x87, 0x3f, 0xd6, 0x91, 0xb9, 0x24, 0xf5, 0x0a, 0x8c, 0x1b, 0xe9, 0xb4, 0x6a,
0x81, 0x91, 0x85, 0x69, 0x8c, 0x48, 0xbd, 0x02, 0xc3, 0x8f, 0xfa, 0xd9, 0x8d, 0x8b, 0xec, 0xe6,
0x27, 0xc7, 0xc1, 0x99, 0x9a, 0x77, 0x32, 0x9f, 0x43, 0xea, 0x72, 0x32, 0x65, 0x71, 0x7d, 0x06,
0x8a, 0xed, 0x1e, 0x7b, 0xc2, 0x48, 0x22, 0x0c, 0xc9, 0x1c, 0x5c, 0x51, 0xe1, 0x91, 0x9a, 0x42,
0x33, 0xf7, 0x33, 0x82, 0xb5, 0x7d, 0x11, 0xee, 0xf4, 0x37, 0x25, 0xf0, 0x1e, 0x9d, 0xd5, 0x3b,
0xde, 0x82, 0x84, 0xcf, 0xb5, 0x18, 0xb8, 0xe2, 0xae, 0x8c, 0x4b, 0xd2, 0xe7, 0x7c, 0x26, 0xef,
0x58, 0xb4, 0x61, 0xca, 0x94, 0x72, 0x39, 0xf7, 0x02, 0xd2, 0xa3, 0xc8, 0xe7, 0x46, 0xcb, 0x26,
0xa4, 0x1e, 0xdb, 0xb4, 0x4f, 0x78, 0xf8, 0x9e, 0x98, 0x3b, 0x84, 0xb5, 0x11, 0xcd, 0x2b, 0xa0,
0x62, 0x33, 0x82, 0x2a, 0xc2, 0xf2, 0x3e, 0xb5, 0xea, 0xe4, 0x3a, 0xb5, 0xbc, 0x05, 0x2b, 0xd2,
0x86, 0x84, 0x85, 0x41, 0xa1, 0xf6, 0x37, 0x7e, 0x4d, 0xc7, 0x4c, 0x21, 0xf3, 0x9a, 0xb6, 0x5d,
0xaf, 0x46, 0xa8, 0xd0, 0x8c, 0x99, 0xf2, 0xeb, 0xe3, 0x37, 0x08, 0x14, 0x5e, 0xa6, 0xf8, 0x43,
0x58, 0xdc, 0xdf, 0xdd, 0xde, 0x7d, 0xf2, 0x6c, 0x37, 0x19, 0x51, 0xdf, 0x3b, 0x3d, 0xd3, 0x12,
0x7c, 0x79, 0xdf, 0x6d, 0xb8, 0xde, 0x6b, 0x17, 0xa7, 0x41, 0x39, 0xa8, 0x94, 0x9f, 0x25, 0x91,
0xba, 0x7c, 0x7a, 0xa6, 0xdd, 0xe0, 0x5b, 0x7c, 0x44, 0x61, 0x15, 0xe2, 0x85, 0xd2, 0x5e, 0xe5,
0xa0, 0x9c, 0x8c, 0xaa, 0xab, 0xa7, 0x67, 0x1a, 0xf0, 0x9d, 0x42, 0x95, 0xd9, 0x5d, 0x82, 0x35,
0x58, 0x2a, 0x3d, 0xd9, 0xd9, 0xa9, 0xec, 0xed, 0x95, 0x1f, 0x26, 0x63, 0xea, 0xfb, 0xa7, 0x67,
0xda, 0x0a, 0xdf, 0xf6, 0x7b, 0x25, 0x23, 0x35, 0x75, 0xf9, 0xcd, 0x8f, 0xd9, 0xc8, 0x2f, 0x3f,
0x65, 0x05, 0x82, 0xfc, 0x3f, 0x8b, 0xb0, 0xd4, 0xe7, 0x18, 0x7f, 0x07, 0x8b, 0xf2, 0x29, 0x81,
0x37, 0x67, 0x7d, 0xde, 0xa8, 0xf7, 0x67, 0xd0, 0x94, 0x24, 0x76, 0x40, 0x11, 0x11, 0x7e, 0x3a,
0xd3, 0x93, 0x40, 0xbd, 0x3b, 0xad, 0x9a, 0x74, 0xdb, 0x80, 0xb8, 0x3f, 0x6d, 0xb1, 0x31, 0xd9,
0xc2, 0xa5, 0xe1, 0xae, 0xde, 0x0e, 0xaf, 0x20, 0x9d, 0x1d, 0x42, 0xdc, 0x4f, 0x06, 0xbe, 0x37,
0xe3, 0x88, 0x53, 0xd3, 0x57, 0x2a, 0xbb, 0xcc, 0x9f, 0xe2, 0xdc, 0xb4, 0x3f, 0xf2, 0xc3, 0x98,
0x1e, 0xfb, 0x38, 0xf8, 0x5f, 0xd3, 0x1d, 0x50, 0x78, 0xe7, 0x0c, 0x93, 0x99, 0x31, 0xe3, 0x32,
0x4c, 0x66, 0xc6, 0x36, 0xe6, 0x6f, 0x21, 0xee, 0xf7, 0xa6, 0x30, 0x11, 0x8d, 0xed, 0xbf, 0xea,
0xe6, 0xf4, 0x8a, 0xd2, 0x79, 0x0f, 0x14, 0xde, 0x82, 0x70, 0x08, 0xf0, 0xe3, 0x9a, 0x9c, 0x7a,
0x6f, 0x6a, 0x3d, 0xdf, 0xf1, 0x6d, 0x84, 0x4f, 0x60, 0x41, 0xb4, 0x17, 0xac, 0x87, 0x40, 0x3f,
0xd4, 0xcb, 0x54, 0x23, 0xf4, 0x79, 0xdf, 0x57, 0xf1, 0xe5, 0xf9, 0xdb, 0x6c, 0xe4, 0xb7, 0xb7,
0xd9, 0xc8, 0xf7, 0x17, 0x59, 0x74, 0x7e, 0x91, 0x45, 0xbf, 0x5e, 0x64, 0xd1, 0x9f, 0x17, 0x59,
0xf4, 0xa2, 0x38, 0xf3, 0x2f, 0xe7, 0x56, 0x20, 0x3f, 0x8f, 0x1c, 0xc5, 0xc5, 0x45, 0xfa, 0xe4,
0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xed, 0x58, 0xc4, 0xcf, 0xc1, 0x0e, 0x00, 0x00,
0xea, 0x61, 0x37, 0xa1, 0x6a, 0xe2, 0xc4, 0x97, 0x62, 0x4c, 0x2b, 0xec, 0xd8, 0xa9, 0x36, 0xb6,
0x13, 0xa7, 0x55, 0xa3, 0x35, 0x8c, 0xf1, 0x0a, 0x76, 0x97, 0x32, 0x03, 0x11, 0xad, 0x54, 0xf5,
0x18, 0xf9, 0xd4, 0x2f, 0xe0, 0x53, 0xfb, 0x21, 0xaa, 0x7e, 0x02, 0x1f, 0x7b, 0xec, 0xa9, 0x6d,
0xfc, 0x25, 0x7a, 0xea, 0x1f, 0xcd, 0xec, 0x2c, 0x60, 0x4c, 0xc5, 0x82, 0xc9, 0x6d, 0x66, 0x67,
0x7e, 0xef, 0xfd, 0xe6, 0xf7, 0xe6, 0xbd, 0x37, 0x0b, 0xdb, 0x35, 0x9b, 0x9d, 0xb6, 0x8f, 0xf5,
0x8a, 0xe7, 0x18, 0x15, 0xcf, 0x65, 0x96, 0xed, 0x92, 0x56, 0x75, 0x70, 0x68, 0x35, 0x6d, 0x83,
0x92, 0x56, 0xc7, 0xae, 0x10, 0x6a, 0x50, 0xd7, 0x6a, 0xd2, 0x53, 0x8f, 0x51, 0xa3, 0x73, 0xaf,
0x3f, 0xd1, 0x9b, 0x2d, 0x8f, 0x79, 0x58, 0xeb, 0xa3, 0xf4, 0x00, 0xa1, 0xf7, 0x37, 0x75, 0xee,
0xa9, 0xa9, 0x9a, 0x57, 0xf3, 0xc4, 0x66, 0x83, 0x8f, 0x7c, 0x9c, 0xfa, 0x5e, 0xcd, 0xf3, 0x6a,
0x0d, 0x62, 0x88, 0xd9, 0x71, 0xfb, 0xc4, 0x20, 0x4e, 0x93, 0x75, 0xe5, 0xa2, 0x36, 0xbc, 0x78,
0x62, 0x93, 0x46, 0xf5, 0xa5, 0x63, 0xd1, 0xba, 0xdc, 0xb1, 0x3a, 0xbc, 0x83, 0xd9, 0x0e, 0xa1,
0xcc, 0x72, 0x9a, 0x72, 0xc3, 0xfd, 0x50, 0x67, 0x64, 0xdd, 0x26, 0xa1, 0x86, 0xe3, 0xb5, 0x5d,
0xe6, 0xe3, 0x72, 0x7f, 0x23, 0x48, 0x7f, 0xde, 0x22, 0x4d, 0xab, 0x45, 0x9e, 0xca, 0x53, 0x98,
0xe4, 0xeb, 0x36, 0xa1, 0x0c, 0x6b, 0x90, 0x08, 0x0e, 0xc6, 0x48, 0x2b, 0x83, 0x34, 0xb4, 0xb6,
0x60, 0x0e, 0x7e, 0xc2, 0x49, 0x88, 0xd5, 0x49, 0x37, 0x13, 0x15, 0x2b, 0x7c, 0x88, 0xd3, 0x10,
0xe7, 0xa6, 0x5c, 0x96, 0x89, 0x89, 0x8f, 0x72, 0x86, 0xbf, 0x84, 0x78, 0xc3, 0x3a, 0x26, 0x0d,
0x9a, 0x51, 0xb4, 0xd8, 0x5a, 0x22, 0xbf, 0xa5, 0x8f, 0xd3, 0x51, 0x1f, 0xcd, 0x4a, 0x7f, 0x2c,
0xcc, 0x94, 0x5c, 0xd6, 0xea, 0x9a, 0xd2, 0xa6, 0xfa, 0x10, 0x12, 0x03, 0x9f, 0x03, 0x5a, 0xa8,
0x4f, 0x2b, 0x05, 0x73, 0x1d, 0xab, 0xd1, 0x26, 0x92, 0xaa, 0x3f, 0x79, 0x14, 0x5d, 0x47, 0xb9,
0x6d, 0xb8, 0x73, 0xcd, 0x11, 0x6d, 0x7a, 0x2e, 0x25, 0xd8, 0x80, 0xb8, 0x50, 0x8a, 0x66, 0x90,
0xe0, 0x7c, 0x67, 0x90, 0xb3, 0x50, 0x52, 0xdf, 0xe5, 0xeb, 0xa6, 0xdc, 0x96, 0xfb, 0x0b, 0xc1,
0xed, 0x43, 0x9b, 0xbc, 0x7a, 0x9b, 0x42, 0x1e, 0x0d, 0x09, 0x59, 0x18, 0x2f, 0xe4, 0x08, 0x4a,
0xb3, 0x56, 0xf1, 0x33, 0x48, 0x5d, 0xf5, 0x32, 0xad, 0x84, 0x45, 0x58, 0x12, 0x1f, 0xe8, 0x0d,
0xb4, 0xcb, 0x15, 0x60, 0x39, 0x30, 0x32, 0x2d, 0x8f, 0x1d, 0x58, 0x31, 0x89, 0xe3, 0x75, 0x66,
0x91, 0x14, 0xfc, 0x5e, 0xac, 0x14, 0x3d, 0xc7, 0xb1, 0xd9, 0xe4, 0xd6, 0x30, 0x28, 0xae, 0xe5,
0x04, 0x92, 0x8b, 0x71, 0xe0, 0x21, 0xd6, 0x8f, 0xcc, 0x17, 0x43, 0xb7, 0xa2, 0x38, 0xfe, 0x56,
0x8c, 0x24, 0x34, 0xeb, 0x7b, 0x51, 0x86, 0xdb, 0x4f, 0x99, 0xc5, 0x66, 0x21, 0xe2, 0xbf, 0x51,
0x50, 0xca, 0xee, 0x89, 0xd7, 0x53, 0x04, 0x0d, 0x28, 0xd2, 0xcf, 0x96, 0xe8, 0x95, 0x6c, 0x79,
0x04, 0x4a, 0xdd, 0x76, 0xab, 0x42, 0xaa, 0xe5, 0xfc, 0x07, 0xe3, 0x55, 0xd9, 0xb1, 0xdd, 0xaa,
0x29, 0x30, 0xb8, 0x08, 0x50, 0x69, 0x11, 0x8b, 0x91, 0xea, 0x4b, 0x8b, 0x65, 0x14, 0x0d, 0xad,
0x25, 0xf2, 0xaa, 0xee, 0xd7, 0x61, 0x3d, 0xa8, 0xc3, 0xfa, 0x7e, 0x50, 0x87, 0x37, 0x6f, 0x5d,
0xfc, 0xbe, 0x1a, 0xf9, 0xe1, 0x8f, 0x55, 0x64, 0x2e, 0x48, 0x5c, 0x81, 0x71, 0x23, 0xed, 0x66,
0x35, 0x30, 0x32, 0x37, 0x89, 0x11, 0x89, 0x2b, 0x30, 0xbc, 0xdd, 0x8b, 0x6e, 0x5c, 0x44, 0x37,
0x3f, 0xfe, 0x1c, 0x5c, 0xa9, 0x59, 0x07, 0xf3, 0x39, 0xa4, 0xae, 0x06, 0x53, 0x26, 0xd7, 0x27,
0xa0, 0xd8, 0xee, 0x89, 0x27, 0x8c, 0x24, 0xc2, 0x88, 0xcc, 0xc9, 0x6d, 0x2a, 0xfc, 0xa4, 0xa6,
0x40, 0xe6, 0x7e, 0x46, 0xb0, 0x72, 0x20, 0x8e, 0x3b, 0xf9, 0x4d, 0x09, 0xbc, 0x47, 0xa7, 0xf5,
0x8e, 0x37, 0x20, 0xe1, 0x6b, 0x2d, 0x1a, 0xae, 0xb8, 0x2b, 0xa3, 0x82, 0xf4, 0x29, 0xef, 0xc9,
0xbb, 0x16, 0xad, 0x9b, 0x32, 0xa4, 0x7c, 0x9c, 0x7b, 0x01, 0xe9, 0x61, 0xe6, 0x33, 0x93, 0x65,
0x1d, 0x52, 0x8f, 0x6d, 0xda, 0x13, 0x3c, 0x7c, 0x4d, 0xcc, 0x1d, 0xc1, 0xca, 0x10, 0xf2, 0x1a,
0xa9, 0xd8, 0x94, 0xa4, 0x36, 0x61, 0xf1, 0x80, 0x5a, 0x35, 0x72, 0x93, 0x5c, 0xde, 0x80, 0x25,
0x69, 0x43, 0xd2, 0xc2, 0xa0, 0x50, 0xfb, 0x1b, 0x3f, 0xa7, 0x63, 0xa6, 0x18, 0xf3, 0x9c, 0xb6,
0x5d, 0xaf, 0x4a, 0xa8, 0x40, 0xc6, 0x4c, 0x39, 0xfb, 0xf0, 0x35, 0x02, 0x85, 0xa7, 0x29, 0x7e,
0x1f, 0xe6, 0x0f, 0xf6, 0x76, 0xf6, 0x9e, 0x3c, 0xdb, 0x4b, 0x46, 0xd4, 0x77, 0xce, 0xce, 0xb5,
0x04, 0xff, 0x7c, 0xe0, 0xd6, 0x5d, 0xef, 0x95, 0x8b, 0xd3, 0xa0, 0x1c, 0x96, 0x4b, 0xcf, 0x92,
0x48, 0x5d, 0x3c, 0x3b, 0xd7, 0x6e, 0xf1, 0x25, 0xde, 0xa2, 0xb0, 0x0a, 0xf1, 0x42, 0x71, 0xbf,
0x7c, 0x58, 0x4a, 0x46, 0xd5, 0xe5, 0xb3, 0x73, 0x0d, 0xf8, 0x4a, 0xa1, 0xc2, 0xec, 0x0e, 0xc1,
0x1a, 0x2c, 0x14, 0x9f, 0xec, 0xee, 0x96, 0xf7, 0xf7, 0x4b, 0x5b, 0xc9, 0x98, 0xfa, 0xee, 0xd9,
0xb9, 0xb6, 0xc4, 0x97, 0xfd, 0x5a, 0xc9, 0x48, 0x55, 0x5d, 0x7c, 0xfd, 0x63, 0x36, 0xf2, 0xcb,
0x4f, 0x59, 0xc1, 0x20, 0xff, 0xcf, 0x3c, 0x2c, 0xf4, 0x34, 0xc6, 0xdf, 0xc1, 0xbc, 0x7c, 0x4a,
0xe0, 0xf5, 0x69, 0x9f, 0x37, 0xea, 0xc3, 0x29, 0x90, 0x52, 0xc4, 0x36, 0x28, 0xe2, 0x84, 0x1f,
0x4f, 0xf5, 0x24, 0x50, 0xef, 0x4f, 0x0a, 0x93, 0x6e, 0xeb, 0x10, 0xf7, 0xbb, 0x2d, 0x36, 0xc6,
0x5b, 0xb8, 0xd2, 0xdc, 0xd5, 0xbb, 0xe1, 0x01, 0xd2, 0xd9, 0x11, 0xc4, 0xfd, 0x60, 0xe0, 0x07,
0x53, 0xb6, 0x38, 0x35, 0x7d, 0x2d, 0xb3, 0x4b, 0xfc, 0x29, 0xce, 0x4d, 0xfb, 0x2d, 0x3f, 0x8c,
0xe9, 0x91, 0x8f, 0x83, 0xff, 0x35, 0xdd, 0x06, 0x85, 0x57, 0xce, 0x30, 0x91, 0x19, 0xd1, 0x2e,
0xc3, 0x44, 0x66, 0x64, 0x61, 0xfe, 0x16, 0xe2, 0x7e, 0x6d, 0x0a, 0x73, 0xa2, 0x91, 0xf5, 0x57,
0x5d, 0x9f, 0x1c, 0x28, 0x9d, 0x77, 0x41, 0xe1, 0x25, 0x08, 0x87, 0x20, 0x3f, 0xaa, 0xc8, 0xa9,
0x0f, 0x26, 0xc6, 0xf9, 0x8e, 0xef, 0x22, 0x7c, 0x0a, 0x73, 0xa2, 0xbc, 0x60, 0x3d, 0x04, 0xfb,
0x81, 0x5a, 0xa6, 0x1a, 0xa1, 0xf7, 0xfb, 0xbe, 0x36, 0xbf, 0xba, 0x78, 0x93, 0x8d, 0xfc, 0xf6,
0x26, 0x1b, 0xf9, 0xfe, 0x32, 0x8b, 0x2e, 0x2e, 0xb3, 0xe8, 0xd7, 0xcb, 0x2c, 0xfa, 0xf3, 0x32,
0x8b, 0x5e, 0x6c, 0x4d, 0xff, 0xcf, 0xb9, 0xd1, 0x9b, 0x3c, 0x8f, 0x1c, 0xc7, 0xc5, 0x55, 0xfa,
0xe8, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8e, 0xa0, 0xb2, 0xda, 0xc4, 0x0e, 0x00, 0x00,
}

View file

@ -8,7 +8,7 @@ import "google/protobuf/field_mask.proto";
import "google/protobuf/timestamp.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
option go_package = "github.com/containerd/containerd/api/services/snapshot/v1;snapshot";
option go_package = "github.com/containerd/containerd/api/services/snapshots/v1;snapshots";
// Snapshot service manages snapshots
service Snapshots {

View file

@ -17,7 +17,7 @@ import (
imagesapi "github.com/containerd/containerd/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
versionservice "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
@ -33,7 +33,7 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -435,8 +435,8 @@ func (c *Client) ContentStore() content.Store {
}
// SnapshotService returns the underlying snapshotter for the provided snapshotter name
func (c *Client) SnapshotService(snapshotterName string) snapshot.Snapshotter {
return NewSnapshotterFromClient(snapshotapi.NewSnapshotsClient(c.conn), snapshotterName)
func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
return NewSnapshotterFromClient(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
}
// TaskService returns the underlying TasksClient

View file

@ -9,7 +9,7 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/rootfs"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -112,7 +112,7 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
"containerd.io/uncompressed": layer.Diff.Digest.String(),
}
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshots.WithLabels(labels))
if err != nil {
return err
}

View file

@ -11,7 +11,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
"github.com/pkg/errors"
)
@ -60,7 +60,7 @@ type DB struct {
// NewDB creates a new metadata database using the provided
// bolt database, content store, and snapshotters.
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshots.Snapshotter) *DB {
m := &DB{
db: db,
ss: make(map[string]*snapshotter, len(ss)),
@ -171,7 +171,7 @@ func (m *DB) ContentStore() content.Store {
// Snapshotter returns a namespaced content store for
// the requested snapshotter name proxied to a snapshotter.
func (m *DB) Snapshotter(name string) snapshot.Snapshotter {
func (m *DB) Snapshotter(name string) snapshots.Snapshotter {
sn, ok := m.ss[name]
if !ok {
return nil

View file

@ -14,12 +14,12 @@ import (
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
"github.com/pkg/errors"
)
type snapshotter struct {
snapshot.Snapshotter
snapshots.Snapshotter
name string
db *DB
l sync.RWMutex
@ -27,7 +27,7 @@ type snapshotter struct {
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
// using the provided name and database.
func newSnapshotter(db *DB, name string, sn snapshot.Snapshotter) *snapshotter {
func newSnapshotter(db *DB, name string, sn snapshots.Snapshotter) *snapshotter {
return &snapshotter{
Snapshotter: sn,
name: name,
@ -75,15 +75,15 @@ func (s *snapshotter) resolveKey(ctx context.Context, key string) (string, error
return id, nil
}
func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, error) {
func (s *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
var (
bkey string
local = snapshot.Info{
local = snapshots.Info{
Name: key,
}
)
@ -108,33 +108,33 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro
return nil
}); err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
info, err := s.Snapshotter.Stat(ctx, bkey)
if err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
return overlayInfo(info, local), nil
}
func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) {
func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
if info.Name == "" {
return snapshot.Info{}, errors.Wrap(errdefs.ErrInvalidArgument, "")
return snapshots.Info{}, errors.Wrap(errdefs.ErrInvalidArgument, "")
}
var (
bkey string
local = snapshot.Info{
local = snapshots.Info{
Name: info.Name,
}
)
@ -195,18 +195,18 @@ func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths
return nil
}); err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
info, err = s.Snapshotter.Stat(ctx, bkey)
if err != nil {
return snapshot.Info{}, err
return snapshots.Info{}, err
}
return overlayInfo(info, local), nil
}
func overlayInfo(info, overlay snapshot.Info) snapshot.Info {
func overlayInfo(info, overlay snapshots.Info) snapshots.Info {
// Merge info
info.Name = overlay.Name
info.Created = overlay.Created
@ -222,10 +222,10 @@ func overlayInfo(info, overlay snapshot.Info) snapshot.Info {
return info
}
func (s *snapshotter) Usage(ctx context.Context, key string) (snapshot.Usage, error) {
func (s *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
bkey, err := s.resolveKey(ctx, key)
if err != nil {
return snapshot.Usage{}, err
return snapshots.Usage{}, err
}
return s.Snapshotter.Usage(ctx, bkey)
}
@ -238,15 +238,15 @@ func (s *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er
return s.Snapshotter.Mounts(ctx, bkey)
}
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshot.Opt) ([]mount.Mount, error) {
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.createSnapshot(ctx, key, parent, false, opts)
}
func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshot.Opt) ([]mount.Mount, error) {
func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.createSnapshot(ctx, key, parent, true, opts)
}
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshot.Opt) ([]mount.Mount, error) {
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshots.Opt) ([]mount.Mount, error) {
s.l.RLock()
defer s.l.RUnlock()
@ -255,7 +255,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return nil, err
}
var base snapshot.Info
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return nil, err
@ -336,7 +336,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return m, nil
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error {
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
s.l.RLock()
defer s.l.RUnlock()
@ -345,7 +345,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}
var base snapshot.Info
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
@ -493,10 +493,10 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
type infoPair struct {
bkey string
info snapshot.Info
info snapshots.Info
}
func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshot.Info) error) error {
func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@ -533,7 +533,7 @@ func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapsho
pair := infoPair{
bkey: string(sbkt.Get(bucketKeyName)),
info: snapshot.Info{
info: snapshots.Info{
Name: string(k),
Parent: string(sbkt.Get(bucketKeyParent)),
},
@ -586,7 +586,7 @@ func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapsho
return nil
}
func validateSnapshot(info *snapshot.Info) error {
func validateSnapshot(info *snapshots.Info) error {
for k, v := range info.Labels {
if err := labels.Validate(k, v); err != nil {
return errors.Wrapf(err, "info.Labels")
@ -670,7 +670,7 @@ func (s *snapshotter) garbageCollect(ctx context.Context) (d time.Duration, err
}
type treeNode struct {
info snapshot.Info
info snapshots.Info
remove bool
children []*treeNode
}
@ -679,7 +679,7 @@ func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([
roots := []*treeNode{}
nodes := map[string]*treeNode{}
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error {
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshots.Info) error {
_, isSeen := seen[info.Name]
node, ok := nodes[info.Name]
if !ok {

View file

@ -4,13 +4,13 @@ import (
"context"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// Client interface used by SpecOpt
type Client interface {
SnapshotService(snapshotterName string) snapshot.Snapshotter
SnapshotService(snapshotterName string) snapshots.Snapshotter
}
// Image interface used by some SpecOpt to query image configuration

View file

@ -9,7 +9,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -30,7 +30,7 @@ type Layer struct {
// The returned result is a chain id digest representing all the applied layers.
// Layers are applied in order they are given, making the first layer the
// bottom-most layer in the layer chain.
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a diff.Differ) (digest.Digest, error) {
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Differ) (digest.Digest, error) {
var chain []digest.Digest
for _, layer := range layers {
if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil {
@ -46,7 +46,7 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a
// ApplyLayer applies a single layer on top of the given provided layer chain,
// using the provided snapshotter and applier. If the layer was unpacked true
// is returned, if the layer already exists false is returned.
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshot.Snapshotter, a diff.Differ, opts ...snapshot.Opt) (bool, error) {
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Differ, opts ...snapshots.Opt) (bool, error) {
var (
parent = identity.ChainID(chain)
chainID = identity.ChainID(append(chain, layer.Diff.Digest))

View file

@ -5,7 +5,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context"
)
@ -14,7 +14,7 @@ import (
// of the snapshot. A content ref is provided to track the progress of the
// content creation and the provided snapshotter and mount differ are used
// for calculating the diff. The descriptor for the layer diff is returned.
func Diff(ctx context.Context, snapshotID string, sn snapshot.Snapshotter, d diff.Differ, opts ...diff.Opt) (ocispec.Descriptor, error) {
func Diff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter, d diff.Differ, opts ...diff.Opt) (ocispec.Descriptor, error) {
info, err := sn.Stat(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
@ -28,7 +28,7 @@ func Diff(ctx context.Context, snapshotID string, sn snapshot.Snapshotter, d dif
defer sn.Remove(ctx, lowerKey)
var upper []mount.Mount
if info.Kind == snapshot.KindActive {
if info.Kind == snapshots.KindActive {
upper, err = sn.Mounts(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err

View file

@ -8,7 +8,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
@ -26,7 +26,7 @@ type Mounter interface {
}
// InitRootFS initializes the snapshot for use as a rootfs
func InitRootFS(ctx context.Context, name string, parent digest.Digest, readonly bool, snapshotter snapshot.Snapshotter, mounter Mounter) ([]mount.Mount, error) {
func InitRootFS(ctx context.Context, name string, parent digest.Digest, readonly bool, snapshotter snapshots.Snapshotter, mounter Mounter) ([]mount.Mount, error) {
_, err := snapshotter.Stat(ctx, name)
if err == nil {
return nil, errors.Errorf("rootfs already exists")
@ -51,7 +51,7 @@ func InitRootFS(ctx context.Context, name string, parent digest.Digest, readonly
return snapshotter.Prepare(ctx, name, parentS)
}
func createInitLayer(ctx context.Context, parent, initName string, initFn func(string) error, snapshotter snapshot.Snapshotter, mounter Mounter) (string, error) {
func createInitLayer(ctx context.Context, parent, initName string, initFn func(string) error, snapshotter snapshots.Snapshotter, mounter Mounter) (string, error) {
initS := fmt.Sprintf("%s %s", parent, initName)
if _, err := snapshotter.Stat(ctx, initS); err == nil {
return initS, nil

View file

@ -18,7 +18,7 @@ import (
introspection "github.com/containerd/containerd/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
version "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/content"
@ -27,7 +27,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
metrics "github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
@ -199,7 +199,7 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
return nil, err
}
snapshotters := make(map[string]snapshot.Snapshotter)
snapshotters := make(map[string]snapshots.Snapshotter)
for name, sn := range snapshottersRaw {
sn, err := sn.Instance()
if err != nil {
@ -207,7 +207,7 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
Warnf("could not use snapshotter %v in metadata plugin", name)
continue
}
snapshotters[name] = sn.(snapshot.Snapshotter)
snapshotters[name] = sn.(snapshots.Snapshotter)
}
path := filepath.Join(ic.Root, "meta.db")
@ -249,7 +249,7 @@ func interceptor(
// No need to change the context
case version.VersionServer:
ctx = log.WithModule(ctx, "version")
case snapshotapi.SnapshotsServer:
case snapshotsapi.SnapshotsServer:
ctx = log.WithModule(ctx, "snapshot")
case diff.DiffServer:
ctx = log.WithModule(ctx, "diff")

View file

@ -4,17 +4,17 @@ import (
"context"
"io"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshots"
protobuftypes "github.com/gogo/protobuf/types"
)
// NewSnapshotterFromClient returns a new Snapshotter which communicates
// over a GRPC connection.
func NewSnapshotterFromClient(client snapshotapi.SnapshotsClient, snapshotterName string) snapshot.Snapshotter {
func NewSnapshotterFromClient(client snapshotsapi.SnapshotsClient, snapshotterName string) snapshots.Snapshotter {
return &remoteSnapshotter{
client: client,
snapshotterName: snapshotterName,
@ -22,25 +22,25 @@ func NewSnapshotterFromClient(client snapshotapi.SnapshotsClient, snapshotterNam
}
type remoteSnapshotter struct {
client snapshotapi.SnapshotsClient
client snapshotsapi.SnapshotsClient
snapshotterName string
}
func (r *remoteSnapshotter) Stat(ctx context.Context, key string) (snapshot.Info, error) {
func (r *remoteSnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
resp, err := r.client.Stat(ctx,
&snapshotapi.StatSnapshotRequest{
&snapshotsapi.StatSnapshotRequest{
Snapshotter: r.snapshotterName,
Key: key,
})
if err != nil {
return snapshot.Info{}, errdefs.FromGRPC(err)
return snapshots.Info{}, errdefs.FromGRPC(err)
}
return toInfo(resp.Info), nil
}
func (r *remoteSnapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) {
func (r *remoteSnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
resp, err := r.client.Update(ctx,
&snapshotapi.UpdateSnapshotRequest{
&snapshotsapi.UpdateSnapshotRequest{
Snapshotter: r.snapshotterName,
Info: fromInfo(info),
UpdateMask: &protobuftypes.FieldMask{
@ -48,24 +48,24 @@ func (r *remoteSnapshotter) Update(ctx context.Context, info snapshot.Info, fiel
},
})
if err != nil {
return snapshot.Info{}, errdefs.FromGRPC(err)
return snapshots.Info{}, errdefs.FromGRPC(err)
}
return toInfo(resp.Info), nil
}
func (r *remoteSnapshotter) Usage(ctx context.Context, key string) (snapshot.Usage, error) {
resp, err := r.client.Usage(ctx, &snapshotapi.UsageRequest{
func (r *remoteSnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
resp, err := r.client.Usage(ctx, &snapshotsapi.UsageRequest{
Snapshotter: r.snapshotterName,
Key: key,
})
if err != nil {
return snapshot.Usage{}, errdefs.FromGRPC(err)
return snapshots.Usage{}, errdefs.FromGRPC(err)
}
return toUsage(resp), nil
}
func (r *remoteSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
resp, err := r.client.Mounts(ctx, &snapshotapi.MountsRequest{
resp, err := r.client.Mounts(ctx, &snapshotsapi.MountsRequest{
Snapshotter: r.snapshotterName,
Key: key,
})
@ -75,14 +75,14 @@ func (r *remoteSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mou
return toMounts(resp.Mounts), nil
}
func (r *remoteSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshot.Opt) ([]mount.Mount, error) {
var local snapshot.Info
func (r *remoteSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return nil, err
}
}
resp, err := r.client.Prepare(ctx, &snapshotapi.PrepareSnapshotRequest{
resp, err := r.client.Prepare(ctx, &snapshotsapi.PrepareSnapshotRequest{
Snapshotter: r.snapshotterName,
Key: key,
Parent: parent,
@ -94,14 +94,14 @@ func (r *remoteSnapshotter) Prepare(ctx context.Context, key, parent string, opt
return toMounts(resp.Mounts), nil
}
func (r *remoteSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshot.Opt) ([]mount.Mount, error) {
var local snapshot.Info
func (r *remoteSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return nil, err
}
}
resp, err := r.client.View(ctx, &snapshotapi.ViewSnapshotRequest{
resp, err := r.client.View(ctx, &snapshotsapi.ViewSnapshotRequest{
Snapshotter: r.snapshotterName,
Key: key,
Parent: parent,
@ -113,14 +113,14 @@ func (r *remoteSnapshotter) View(ctx context.Context, key, parent string, opts .
return toMounts(resp.Mounts), nil
}
func (r *remoteSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error {
var local snapshot.Info
func (r *remoteSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
var local snapshots.Info
for _, opt := range opts {
if err := opt(&local); err != nil {
return err
}
}
_, err := r.client.Commit(ctx, &snapshotapi.CommitSnapshotRequest{
_, err := r.client.Commit(ctx, &snapshotsapi.CommitSnapshotRequest{
Snapshotter: r.snapshotterName,
Name: name,
Key: key,
@ -130,15 +130,15 @@ func (r *remoteSnapshotter) Commit(ctx context.Context, name, key string, opts .
}
func (r *remoteSnapshotter) Remove(ctx context.Context, key string) error {
_, err := r.client.Remove(ctx, &snapshotapi.RemoveSnapshotRequest{
_, err := r.client.Remove(ctx, &snapshotsapi.RemoveSnapshotRequest{
Snapshotter: r.snapshotterName,
Key: key,
})
return errdefs.FromGRPC(err)
}
func (r *remoteSnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshot.Info) error) error {
sc, err := r.client.List(ctx, &snapshotapi.ListSnapshotsRequest{
func (r *remoteSnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
sc, err := r.client.List(ctx, &snapshotsapi.ListSnapshotsRequest{
Snapshotter: r.snapshotterName,
})
if err != nil {
@ -167,18 +167,18 @@ func (r *remoteSnapshotter) Close() error {
return nil
}
func toKind(kind snapshotapi.Kind) snapshot.Kind {
if kind == snapshotapi.KindActive {
return snapshot.KindActive
func toKind(kind snapshotsapi.Kind) snapshots.Kind {
if kind == snapshotsapi.KindActive {
return snapshots.KindActive
}
if kind == snapshotapi.KindView {
return snapshot.KindView
if kind == snapshotsapi.KindView {
return snapshots.KindView
}
return snapshot.KindCommitted
return snapshots.KindCommitted
}
func toInfo(info snapshotapi.Info) snapshot.Info {
return snapshot.Info{
func toInfo(info snapshotsapi.Info) snapshots.Info {
return snapshots.Info{
Name: info.Name,
Parent: info.Parent,
Kind: toKind(info.Kind),
@ -188,8 +188,8 @@ func toInfo(info snapshotapi.Info) snapshot.Info {
}
}
func toUsage(resp *snapshotapi.UsageResponse) snapshot.Usage {
return snapshot.Usage{
func toUsage(resp *snapshotsapi.UsageResponse) snapshots.Usage {
return snapshots.Usage{
Inodes: resp.Inodes,
Size: resp.Size_,
}
@ -207,18 +207,18 @@ func toMounts(mm []*types.Mount) []mount.Mount {
return mounts
}
func fromKind(kind snapshot.Kind) snapshotapi.Kind {
if kind == snapshot.KindActive {
return snapshotapi.KindActive
func fromKind(kind snapshots.Kind) snapshotsapi.Kind {
if kind == snapshots.KindActive {
return snapshotsapi.KindActive
}
if kind == snapshot.KindView {
return snapshotapi.KindView
if kind == snapshots.KindView {
return snapshotsapi.KindView
}
return snapshotapi.KindCommitted
return snapshotsapi.KindCommitted
}
func fromInfo(info snapshot.Info) snapshotapi.Info {
return snapshotapi.Info{
func fromInfo(info snapshots.Info) snapshotsapi.Info {
return snapshotsapi.Info{
Name: info.Name,
Parent: info.Parent,
Kind: fromKind(info.Kind),

View file

@ -1,4 +1,4 @@
package snapshot
package snapshots
import (
"context"

View file

@ -1,7 +1,7 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc ed1cbe1fc31f5fb2359d3a54b6330d1a097858b7
github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/containerd/cgroups f7dd103d3e4e696aa67152f6b4ddd1779a3455a9
github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
@ -41,4 +41,4 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc bdb2ab7a8169e485e39421e666e15a505e575fd2
github.com/stevvooe/ttrpc 8c92e22ce0c492875ccaac3ab06143a77d8ed0c1

View file

@ -1,5 +1,7 @@
# ttrpc
[![Build Status](https://travis-ci.org/stevvooe/ttrpc.svg?branch=master)](https://travis-ci.org/stevvooe/ttrpc)
GRPC for low-memory environments.
The existing grpc-go project requires a lot of memory overhead for importing

View file

@ -5,13 +5,16 @@ import (
"context"
"encoding/binary"
"io"
"sync"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
messageHeaderLength = 10
messageLengthMax = 8 << 10
messageLengthMax = 4 << 20
)
type messageType uint8
@ -54,6 +57,8 @@ func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
return err
}
var buffers sync.Pool
type channel struct {
bw *bufio.Writer
br *bufio.Reader
@ -68,21 +73,32 @@ func newChannel(w io.Writer, r io.Reader) *channel {
}
}
func (ch *channel) recv(ctx context.Context, p []byte) (messageHeader, error) {
// recv a message from the channel. The returned buffer contains the message.
//
// If a valid grpc status is returned, the message header
// returned will be valid and caller should send that along to
// the correct consumer. The bytes on the underlying channel
// will be discarded.
func (ch *channel) recv(ctx context.Context) (messageHeader, []byte, error) {
mh, err := readMessageHeader(ch.hrbuf[:], ch.br)
if err != nil {
return messageHeader{}, err
return messageHeader{}, nil, err
}
if mh.Length > uint32(len(p)) {
return messageHeader{}, errors.Wrapf(io.ErrShortBuffer, "message length %v over buffer size %v", mh.Length, len(p))
if mh.Length > uint32(messageLengthMax) {
if _, err := ch.br.Discard(int(mh.Length)); err != nil {
return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message")
}
return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
}
if _, err := io.ReadFull(ch.br, p[:mh.Length]); err != nil {
return messageHeader{}, errors.Wrapf(err, "failed reading message")
p := ch.getmbuf(int(mh.Length))
if _, err := io.ReadFull(ch.br, p); err != nil {
return messageHeader{}, nil, errors.Wrapf(err, "failed reading message")
}
return mh, nil
return mh, p, nil
}
func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error {
@ -97,3 +113,23 @@ func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p [
return ch.bw.Flush()
}
func (ch *channel) getmbuf(size int) []byte {
// we can't use the standard New method on pool because we want to allocate
// based on size.
b, ok := buffers.Get().(*[]byte)
if !ok || cap(*b) < size {
// TODO(stevvooe): It may be better to allocate these in fixed length
// buckets to reduce fragmentation but its not clear that would help
// with performance. An ilogb approach or similar would work well.
bb := make([]byte, size)
b = &bb
} else {
*b = (*b)[:size]
}
return *b
}
func (ch *channel) putmbuf(p []byte) {
buffers.Put(&p)
}

View file

@ -4,19 +4,18 @@ import (
"context"
"net"
"sync"
"sync/atomic"
"github.com/containerd/containerd/log"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"google.golang.org/grpc/status"
)
type Client struct {
codec codec
channel *channel
requestID uint32
sendRequests chan sendRequest
recvRequests chan recvRequest
codec codec
conn net.Conn
channel *channel
calls chan *callRequest
closed chan struct{}
closeOnce sync.Once
@ -26,50 +25,76 @@ type Client struct {
func NewClient(conn net.Conn) *Client {
c := &Client{
codec: codec{},
requestID: 1,
channel: newChannel(conn, conn),
sendRequests: make(chan sendRequest),
recvRequests: make(chan recvRequest),
closed: make(chan struct{}),
done: make(chan struct{}),
codec: codec{},
conn: conn,
channel: newChannel(conn, conn),
calls: make(chan *callRequest),
closed: make(chan struct{}),
done: make(chan struct{}),
}
go c.run()
return c
}
type callRequest struct {
ctx context.Context
req *Request
resp *Response // response will be written back here
errs chan error // error written here on completion
}
func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error {
payload, err := c.codec.Marshal(req)
if err != nil {
return err
}
requestID := atomic.AddUint32(&c.requestID, 2)
request := Request{
Service: service,
Method: method,
Payload: payload,
}
var (
creq = &Request{
Service: service,
Method: method,
Payload: payload,
}
if err := c.send(ctx, requestID, &request); err != nil {
cresp = &Response{}
)
if err := c.dispatch(ctx, creq, cresp); err != nil {
return err
}
var response Response
if err := c.recv(ctx, requestID, &response); err != nil {
if err := c.codec.Unmarshal(cresp.Payload, resp); err != nil {
return err
}
if err := c.codec.Unmarshal(response.Payload, resp); err != nil {
return err
}
if response.Status == nil {
if cresp.Status == nil {
return errors.New("no status provided on response")
}
return status.ErrorProto(response.Status)
return status.ErrorProto(cresp.Status)
}
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1)
call := &callRequest{
req: req,
resp: resp,
errs: errs,
}
select {
case c.calls <- call:
case <-c.done:
return c.err
}
select {
case err := <-errs:
return err
case <-c.done:
return c.err
}
}
func (c *Client) Close() error {
@ -80,92 +105,42 @@ func (c *Client) Close() error {
return nil
}
type sendRequest struct {
ctx context.Context
id uint32
msg interface{}
err chan error
}
func (c *Client) send(ctx context.Context, id uint32, msg interface{}) error {
errs := make(chan error, 1)
select {
case c.sendRequests <- sendRequest{
ctx: ctx,
id: id,
msg: msg,
err: errs,
}:
case <-ctx.Done():
return ctx.Err()
case <-c.done:
return c.err
}
select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
case <-c.done:
return c.err
}
}
type recvRequest struct {
id uint32
msg interface{}
err chan error
}
func (c *Client) recv(ctx context.Context, id uint32, msg interface{}) error {
errs := make(chan error, 1)
select {
case c.recvRequests <- recvRequest{
id: id,
msg: msg,
err: errs,
}:
case <-c.done:
return c.err
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-errs:
return err
case <-c.done:
return c.err
case <-ctx.Done():
return ctx.Err()
}
}
type received struct {
mh messageHeader
type message struct {
messageHeader
p []byte
err error
}
func (c *Client) run() {
defer close(c.done)
var (
waiters = map[uint32]recvRequest{}
queued = map[uint32]received{} // messages unmatched by waiter
incoming = make(chan received)
streamID uint32 = 1
waiters = make(map[uint32]*callRequest)
calls = c.calls
incoming = make(chan *message)
shutdown = make(chan struct{})
shutdownErr error
)
go func() {
defer close(shutdown)
// start one more goroutine to recv messages without blocking.
for {
var p [messageLengthMax]byte
mh, err := c.channel.recv(context.TODO(), p[:])
mh, p, err := c.channel.recv(context.TODO())
if err != nil {
_, ok := status.FromError(err)
if !ok {
// treat all errors that are not an rpc status as terminal.
// all others poison the connection.
shutdownErr = err
return
}
}
select {
case incoming <- received{
mh: mh,
p: p[:mh.Length],
err: err,
case incoming <- &message{
messageHeader: mh,
p: p[:mh.Length],
err: err,
}:
case <-c.done:
return
@ -173,32 +148,64 @@ func (c *Client) run() {
}
}()
defer c.conn.Close()
defer close(c.done)
for {
select {
case req := <-c.sendRequests:
if p, err := proto.Marshal(req.msg.(proto.Message)); err != nil {
req.err <- err
} else {
req.err <- c.channel.send(req.ctx, req.id, messageTypeRequest, p)
}
case req := <-c.recvRequests:
if r, ok := queued[req.id]; ok {
req.err <- proto.Unmarshal(r.p, req.msg.(proto.Message))
}
waiters[req.id] = req
case r := <-incoming:
if r.err != nil {
c.err = r.err
return
case call := <-calls:
if err := c.send(call.ctx, streamID, messageTypeRequest, call.req); err != nil {
call.errs <- err
continue
}
if waiter, ok := waiters[r.mh.StreamID]; ok {
waiter.err <- proto.Unmarshal(r.p, waiter.msg.(proto.Message))
} else {
queued[r.mh.StreamID] = r
waiters[streamID] = call
streamID += 2 // enforce odd client initiated request ids
case msg := <-incoming:
call, ok := waiters[msg.StreamID]
if !ok {
log.L.Errorf("ttrpc: received message for unknown channel %v", msg.StreamID)
continue
}
call.errs <- c.recv(call.resp, msg)
delete(waiters, msg.StreamID)
case <-shutdown:
shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down")
c.err = shutdownErr
for _, waiter := range waiters {
waiter.errs <- shutdownErr
}
c.Close()
return
case <-c.closed:
// broadcast the shutdown error to the remaining waiters.
for _, waiter := range waiters {
waiter.errs <- shutdownErr
}
return
}
}
}
func (c *Client) send(ctx context.Context, streamID uint32, mtype messageType, msg interface{}) error {
p, err := c.codec.Marshal(msg)
if err != nil {
return err
}
return c.channel.send(ctx, streamID, mtype, p)
}
func (c *Client) recv(resp *Response, msg *message) error {
if msg.err != nil {
return msg.err
}
if msg.Type != messageTypeResponse {
return errors.New("unkown message type received")
}
defer c.channel.putmbuf(msg.p)
return proto.Unmarshal(msg.p, resp)
}

View file

@ -2,21 +2,38 @@ package ttrpc
import (
"context"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
ErrServerClosed = errors.New("ttrpc: server close")
)
type Server struct {
services *serviceSet
codec codec
mu sync.Mutex
listeners map[net.Listener]struct{}
connections map[*serverConn]struct{} // all connections to current state
done chan struct{} // marks point at which we stop serving requests
}
func NewServer() *Server {
return &Server{
services: newServiceSet(),
services: newServiceSet(),
done: make(chan struct{}),
listeners: make(map[net.Listener]struct{}),
connections: make(map[*serverConn]struct{}),
}
}
@ -24,28 +41,208 @@ func (s *Server) Register(name string, methods map[string]Method) {
s.services.register(name, methods)
}
func (s *Server) Shutdown(ctx context.Context) error {
// TODO(stevvooe): Wait on connection shutdown.
return nil
}
func (s *Server) Serve(l net.Listener) error {
s.addListener(l)
defer s.closeListener(l)
var (
ctx = context.Background()
backoff time.Duration
)
for {
conn, err := l.Accept()
if err != nil {
log.L.WithError(err).Error("failed accept")
continue
select {
case <-s.done:
return ErrServerClosed
default:
}
if terr, ok := err.(interface {
Temporary() bool
}); ok && terr.Temporary() {
if backoff == 0 {
backoff = time.Millisecond
} else {
backoff *= 2
}
if max := time.Second; backoff > max {
backoff = max
}
sleep := time.Duration(rand.Int63n(int64(backoff)))
log.L.WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
time.Sleep(sleep)
continue
}
return err
}
go s.handleConn(conn)
backoff = 0
sc := s.newConn(conn)
go sc.run(ctx)
}
}
func (s *Server) Shutdown(ctx context.Context) error {
s.mu.Lock()
lnerr := s.closeListeners()
select {
case <-s.done:
default:
// protected by mutex
close(s.done)
}
s.mu.Unlock()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
if s.closeIdleConns() {
return lnerr
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
// Close the server without waiting for active connections.
func (s *Server) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-s.done:
default:
// protected by mutex
close(s.done)
}
err := s.closeListeners()
for c := range s.connections {
c.close()
delete(s.connections, c)
}
return err
}
func (s *Server) addListener(l net.Listener) {
s.mu.Lock()
defer s.mu.Unlock()
s.listeners[l] = struct{}{}
}
func (s *Server) closeListener(l net.Listener) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.closeListenerLocked(l)
}
func (s *Server) closeListenerLocked(l net.Listener) error {
defer delete(s.listeners, l)
return l.Close()
}
func (s *Server) closeListeners() error {
var err error
for l := range s.listeners {
if cerr := s.closeListenerLocked(l); cerr != nil && err == nil {
err = cerr
}
}
return err
}
func (s *Server) addConnection(c *serverConn) {
s.mu.Lock()
defer s.mu.Unlock()
s.connections[c] = struct{}{}
}
func (s *Server) closeIdleConns() bool {
s.mu.Lock()
defer s.mu.Unlock()
quiescent := true
for c := range s.connections {
st, ok := c.getState()
if !ok || st != connStateIdle {
quiescent = false
continue
}
c.close()
delete(s.connections, c)
}
return quiescent
}
type connState int
const (
connStateActive = iota + 1 // outstanding requests
connStateIdle // no requests
connStateClosed // closed connection
)
func (cs connState) String() string {
switch cs {
case connStateActive:
return "active"
case connStateIdle:
return "idle"
case connStateClosed:
return "closed"
default:
return "unknown"
}
}
func (s *Server) newConn(conn net.Conn) *serverConn {
c := &serverConn{
server: s,
conn: conn,
shutdown: make(chan struct{}),
}
c.setState(connStateIdle)
s.addConnection(c)
return c
}
type serverConn struct {
server *Server
conn net.Conn
state atomic.Value
shutdownOnce sync.Once
shutdown chan struct{} // forced shutdown, used by close
}
func (c *serverConn) getState() (connState, bool) {
cs, ok := c.state.Load().(connState)
return cs, ok
}
func (c *serverConn) setState(newstate connState) {
c.state.Store(newstate)
}
func (c *serverConn) close() error {
c.shutdownOnce.Do(func() {
close(c.shutdown)
})
return nil
}
func (s *Server) handleConn(conn net.Conn) {
defer conn.Close()
func (c *serverConn) run(sctx context.Context) {
type (
request struct {
id uint32
@ -59,25 +256,66 @@ func (s *Server) handleConn(conn net.Conn) {
)
var (
ch = newChannel(conn, conn)
ctx, cancel = context.WithCancel(context.Background())
responses = make(chan response)
requests = make(chan request)
recvErr = make(chan error, 1)
done = make(chan struct{})
ch = newChannel(c.conn, c.conn)
ctx, cancel = context.WithCancel(sctx)
active int
state connState = connStateIdle
responses = make(chan response)
requests = make(chan request)
recvErr = make(chan error, 1)
shutdown = c.shutdown
done = make(chan struct{})
)
defer c.conn.Close()
defer cancel()
defer close(done)
go func() {
go func(recvErr chan error) {
defer close(recvErr)
var p [messageLengthMax]byte
sendImmediate := func(id uint32, st *status.Status) bool {
select {
case responses <- response{
// even though we've had an invalid stream id, we send it
// back on the same stream id so the client knows which
// stream id was bad.
id: id,
resp: &Response{
Status: st.Proto(),
},
}:
return true
case <-c.shutdown:
return false
case <-done:
return false
}
}
for {
mh, err := ch.recv(ctx, p[:])
if err != nil {
recvErr <- err
select {
case <-c.shutdown:
return
case <-done:
return
default: // proceed
}
mh, p, err := ch.recv(ctx)
if err != nil {
status, ok := status.FromError(err)
if !ok {
recvErr <- err
return
}
// in this case, we send an error for that particular message
// when the status is defined.
if !sendImmediate(mh.StreamID, status) {
return
}
continue
}
if mh.Type != messageTypeRequest {
@ -86,44 +324,57 @@ func (s *Server) handleConn(conn net.Conn) {
}
var req Request
if err := s.codec.Unmarshal(p[:mh.Length], &req); err != nil {
recvErr <- err
return
if err := c.server.codec.Unmarshal(p, &req); err != nil {
ch.putmbuf(p)
if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "unmarshal request error: %v", err)) {
return
}
continue
}
ch.putmbuf(p)
if mh.StreamID%2 != 1 {
// enforce odd client initiated identifiers.
select {
case responses <- response{
// even though we've had an invalid stream id, we send it
// back on the same stream id so the client knows which
// stream id was bad.
id: mh.StreamID,
resp: &Response{
Status: status.New(codes.InvalidArgument, "StreamID must be odd for client initiated streams").Proto(),
},
}:
case <-done:
if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID must be odd for client initiated streams")) {
return
}
continue
}
// Forward the request to the main loop. We don't wait on s.done
// because we have already accepted the client request.
select {
case requests <- request{
id: mh.StreamID,
req: &req,
}:
case <-done:
return
}
}
}()
}(recvErr)
for {
newstate := state
switch {
case active > 0:
newstate = connStateActive
shutdown = nil
case active == 0:
newstate = connStateIdle
shutdown = c.shutdown // only enable this branch in idle mode
}
if newstate != state {
c.setState(newstate)
state = newstate
}
select {
case request := <-requests:
active++
go func(id uint32) {
p, status := s.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{
Status: status.Proto(),
Payload: p,
@ -138,17 +389,27 @@ func (s *Server) handleConn(conn net.Conn) {
}
}(request.id)
case response := <-responses:
p, err := s.codec.Marshal(response.resp)
p, err := c.server.codec.Marshal(response.resp)
if err != nil {
log.L.WithError(err).Error("failed marshaling response")
return
}
if err := ch.send(ctx, response.id, messageTypeResponse, p); err != nil {
log.L.WithError(err).Error("failed sending message on channel")
return
}
active--
case err := <-recvErr:
log.L.WithError(err).Error("error receiving message")
// TODO(stevvooe): Not wildly clear what we should do in this
// branch. Basically, it means that we are no longer receiving
// requests due to a terminal error.
recvErr = nil // connection is now "closing"
if err != nil {
log.L.WithError(err).Error("error receiving message")
}
case <-shutdown:
return
}
}