vendor: github.com/hashicorp/serf v0.8.2, go-msgpack v0.5.3

un-pin these dependencies

full diff: https://github.com/hashicorp/serf/compare/598c54895cc5...v0.8.2
full diff: https://github.com/hashicorp/go-msgpack/compare/71c2886f5a67...v0.5.3

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-06-29 15:12:45 +02:00
parent f3b2df7b0b
commit 06a7f41488
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
17 changed files with 880 additions and 347 deletions

View file

@ -137,7 +137,6 @@ require (
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/tinylib/msgp v1.1.0 // indirect
github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea // indirect
github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.2 // indirect
go.etcd.io/etcd/raft/v3 v3.5.2 // indirect
@ -163,14 +162,9 @@ require (
google.golang.org/api v0.54.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect
)
replace (
github.com/armon/go-radix => github.com/armon/go-radix v0.0.0-20150105235045-e39d623f12e8
github.com/hashicorp/go-msgpack => github.com/hashicorp/go-msgpack v0.0.0-20140221154404-71c2886f5a67
github.com/hashicorp/serf => github.com/hashicorp/serf v0.7.1-0.20160317193612-598c54895cc5
)
replace github.com/armon/go-radix => github.com/armon/go-radix v0.0.0-20150105235045-e39d623f12e8
// Removes etcd dependency
replace github.com/rexray/gocsi => github.com/dperny/gocsi v1.2.3-pre

View file

@ -122,6 +122,7 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@ -613,8 +614,8 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-memdb v1.3.2 h1:RBKHOsnSszpU6vxq80LzC2BaQjuuvoyaQbkLTf7V7g8=
github.com/hashicorp/go-memdb v1.3.2/go.mod h1:Mluclgwib3R93Hk5fxEfiRhB+6Dar64wWh71LpNSe3g=
github.com/hashicorp/go-msgpack v0.0.0-20140221154404-71c2886f5a67 h1:uUGuA3Cnfp7qbFpIMOCDVz3TaWIF4lLYM8PE3YHpoA4=
github.com/hashicorp/go-msgpack v0.0.0-20140221154404-71c2886f5a67/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
@ -624,18 +625,23 @@ github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg=
github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/serf v0.7.1-0.20160317193612-598c54895cc5 h1:c0ibg28HTeX1LYjzYBkMzdnXuPn+iGx3huEB//PkB08=
github.com/hashicorp/serf v0.7.1-0.20160317193612-598c54895cc5/go.mod h1:h/Ru6tmZazX7WO/GDmwdpS975F019L4t5ng5IgwbNrE=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hugelgupf/socketpair v0.0.0-20190730060125-05d35a94e714/go.mod h1:2Goc3h8EklBH5mspfHFxBnEoURQCGzQQH1ga9Myjvis=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@ -726,6 +732,7 @@ github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcK
github.com/mdlayher/netlink v1.1.1/go.mod h1:WTYpFb/WTvlRJAyKhZL5/uy69TDDpHHu2VZmb2XgV7o=
github.com/mdlayher/raw v0.0.0-20190606142536-fef19f00fc18/go.mod h1:7EpbotpCmVZcu+KCX4g9WaRNuu11uyhiW7+Le1dKawg=
github.com/mdlayher/raw v0.0.0-20191009151244-50f2db8cc065/go.mod h1:7EpbotpCmVZcu+KCX4g9WaRNuu11uyhiW7+Le1dKawg=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.27 h1:aEH/kqUzUxGJ/UHcEKdJY+ugH6WEzsEBBSPa8zuy1aM=
github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
@ -737,8 +744,11 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/moby/buildkit v0.10.3 h1:/dGykD8FW+H4p++q5+KqKEo6gAkYKyBQHdawdjVwVAU=
@ -918,6 +928,7 @@ github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/safchain/ethtool v0.0.0-20210803160452-9aa261dae9b1/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
@ -1020,8 +1031,6 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@ -1131,6 +1140,7 @@ go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181009213950-7c1a557ab941/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -1185,7 +1195,9 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1274,6 +1286,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -1729,8 +1742,6 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 h1:L0cnkNl4TfAXzvdrqsYEmxOHOCv2p5I3taaReO8BWFs=
labix.org/v2/mgo v0.0.0-20140701140051-000000000287/go.mod h1:Lg7AYkt1uXJoR9oeSZ3W/8IXLdvOfIITgZnommstyz4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

View file

@ -6,6 +6,8 @@ import (
"sort"
"sync"
"time"
"github.com/armon/go-metrics"
)
// Client manages the estimated network coordinate for a given node, and adjusts
@ -34,10 +36,20 @@ type Client struct {
// value to determine how many samples we keep, per node.
latencyFilterSamples map[string][]float64
// stats is used to record events that occur when updating coordinates.
stats ClientStats
// mutex enables safe concurrent access to the client.
mutex sync.RWMutex
}
// ClientStats is used to record events that occur when updating coordinates.
type ClientStats struct {
// Resets is incremented any time we reset our local coordinate because
// our calculations have resulted in an invalid state.
Resets int
}
// NewClient creates a new Client and verifies the configuration is valid.
func NewClient(config *Config) (*Client, error) {
if !(config.Dimensionality > 0) {
@ -63,11 +75,16 @@ func (c *Client) GetCoordinate() *Coordinate {
}
// SetCoordinate forces the client's coordinate to a known state.
func (c *Client) SetCoordinate(coord *Coordinate) {
func (c *Client) SetCoordinate(coord *Coordinate) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(coord); err != nil {
return err
}
c.coord = coord.Clone()
return nil
}
// ForgetNode removes any client state for the given node.
@ -78,6 +95,29 @@ func (c *Client) ForgetNode(node string) {
delete(c.latencyFilterSamples, node)
}
// Stats returns a copy of stats for the client.
func (c *Client) Stats() ClientStats {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.stats
}
// checkCoordinate returns an error if the coordinate isn't compatible with
// this client, or if the coordinate itself isn't valid. This assumes the mutex
// has been locked already.
func (c *Client) checkCoordinate(coord *Coordinate) error {
if !c.coord.IsCompatibleWith(coord) {
return fmt.Errorf("dimensions aren't compatible")
}
if !coord.IsValid() {
return fmt.Errorf("coordinate is invalid")
}
return nil
}
// latencyFilter applies a simple moving median filter with a new sample for
// a node. This assumes that the mutex has been locked already.
func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
@ -159,15 +199,38 @@ func (c *Client) updateGravity() {
// Update takes other, a coordinate for another node, and rtt, a round trip
// time observation for a ping to that node, and updates the estimated position of
// the client's coordinate. Returns the updated coordinate.
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(other); err != nil {
return nil, err
}
// The code down below can handle zero RTTs, which we have seen in
// https://github.com/hashicorp/consul/issues/3789, presumably in
// environments with coarse-grained monotonic clocks (we are still
// trying to pin this down). In any event, this is ok from a code PoV
// so we don't need to alert operators with spammy messages. We did
// add a counter so this is still observable, though.
const maxRTT = 10 * time.Second
if rtt < 0 || rtt > maxRTT {
return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT)
}
if rtt == 0 {
metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1)
}
rttSeconds := c.latencyFilter(node, rtt.Seconds())
c.updateVivaldi(other, rttSeconds)
c.updateAdjustment(other, rttSeconds)
c.updateGravity()
return c.coord.Clone()
if !c.coord.IsValid() {
c.stats.Resets++
c.coord = NewCoordinate(c.config)
}
return c.coord.Clone(), nil
}
// DistanceTo returns the estimated RTT from the client's coordinate to other, the

View file

@ -16,7 +16,7 @@ package coordinate
type Config struct {
// The dimensionality of the coordinate system. As discussed in [2], more
// dimensions improves the accuracy of the estimates up to a point. Per [2]
// we chose 4 dimensions plus a non-Euclidean height.
// we chose 8 dimensions plus a non-Euclidean height.
Dimensionality uint
// VivaldiErrorMax is the default error value when a node hasn't yet made

View file

@ -72,6 +72,26 @@ func (c *Coordinate) Clone() *Coordinate {
}
}
// componentIsValid returns false if a floating point value is a NaN or an
// infinity.
func componentIsValid(f float64) bool {
return !math.IsInf(f, 0) && !math.IsNaN(f)
}
// IsValid returns false if any component of a coordinate isn't valid, per the
// componentIsValid() helper above.
func (c *Coordinate) IsValid() bool {
for i := range c.Vec {
if !componentIsValid(c.Vec[i]) {
return false
}
}
return componentIsValid(c.Error) &&
componentIsValid(c.Adjustment) &&
componentIsValid(c.Height)
}
// IsCompatibleWith checks to see if the two coordinates are compatible
// dimensionally. If this returns true then you are guaranteed to not get
// any runtime errors operating on them.
@ -122,7 +142,7 @@ func (c *Coordinate) rawDistanceTo(other *Coordinate) float64 {
// already been checked to be compatible.
func add(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] + vec2[i]
}
return ret
@ -132,7 +152,7 @@ func add(vec1 []float64, vec2 []float64) []float64 {
// dimensions have already been checked to be compatible.
func diff(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] - vec2[i]
}
return ret
@ -141,7 +161,7 @@ func diff(vec1 []float64, vec2 []float64) []float64 {
// mul returns vec multiplied by a scalar factor.
func mul(vec []float64, factor float64) []float64 {
ret := make([]float64, len(vec))
for i, _ := range vec {
for i := range vec {
ret[i] = vec[i] * factor
}
return ret
@ -150,7 +170,7 @@ func mul(vec []float64, factor float64) []float64 {
// magnitude computes the magnitude of the vec.
func magnitude(vec []float64) float64 {
sum := 0.0
for i, _ := range vec {
for i := range vec {
sum += vec[i] * vec[i]
}
return math.Sqrt(sum)
@ -168,7 +188,7 @@ func unitVectorAt(vec1 []float64, vec2 []float64) ([]float64, float64) {
}
// Otherwise, just return a random unit vector.
for i, _ := range ret {
for i := range ret {
ret[i] = rand.Float64() - 0.5
}
if mag := magnitude(ret); mag > zeroThreshold {

View file

@ -16,6 +16,9 @@ func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
// implements memberlist.UniqueBroadcast
func (b *broadcast) UniqueBroadcast() {}
func (b *broadcast) Message() []byte {
return b.msg
}

View file

@ -2,6 +2,7 @@ package serf
import (
"io"
"log"
"os"
"time"
@ -15,6 +16,7 @@ var ProtocolVersionMap map[uint8]uint8
func init() {
ProtocolVersionMap = map[uint8]uint8{
5: 2,
4: 2,
3: 2,
2: 2,
@ -53,6 +55,13 @@ type Config struct {
// set, a timeout of 5 seconds will be set.
BroadcastTimeout time.Duration
// LeavePropagateDelay is for our leave (node dead) message to propagate
// through the cluster. In particular, we want to stay up long enough to
// service any probes from other nodes before they learn about us
// leaving and stop probing. Otherwise, we risk getting node failures as
// we leave.
LeavePropagateDelay time.Duration
// The settings below relate to Serf's event coalescence feature. Serf
// is able to coalesce multiple events into single events in order to
// reduce the amount of noise that is sent along the EventCh. For example
@ -103,6 +112,17 @@ type Config struct {
ReconnectTimeout time.Duration
TombstoneTimeout time.Duration
// FlapTimeout is the amount of time less than which we consider a node
// being failed and rejoining looks like a flap for telemetry purposes.
// This should be set less than a typical reboot time, but large enough
// to see actual events, given our expected detection times for a failed
// node.
FlapTimeout time.Duration
// QueueCheckInterval is the interval at which we check the message
// queue to apply the warning and max depth.
QueueCheckInterval time.Duration
// QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered
@ -114,12 +134,18 @@ type Config struct {
// prevent an unbounded growth of memory utilization
MaxQueueDepth int
// RecentIntentBuffer is used to set the size of recent join and leave intent
// messages that will be buffered. This is used to guard against
// the case where Serf broadcasts an intent that arrives before the
// Memberlist event. It is important that this not be too small to avoid
// continuous rebroadcasting of dead events.
RecentIntentBuffer int
// MinQueueDepth, if >0 will enforce a lower limit for dropping messages
// and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This
// defaults to 0 which disables this dynamic sizing feature. If this is
// >0 then MaxQueueDepth will be ignored.
MinQueueDepth int
// RecentIntentTimeout is used to determine how long we store recent
// join and leave intents. This is used to guard against the case where
// Serf broadcasts an intent that arrives before the Memberlist event.
// It is important that this not be too short to avoid continuous
// rebroadcasting of dead events.
RecentIntentTimeout time.Duration
// EventBuffer is used to control how many events are buffered.
// This is used to prevent re-delivery of events to a client. The buffer
@ -175,6 +201,12 @@ type Config struct {
// logs will go to stderr.
LogOutput io.Writer
// Logger is a custom logger which you provide. If Logger is set, it will use
// this for the internal logger. If Logger is not set, it will fall back to the
// behavior for using LogOutput. You cannot specify both LogOutput and Logger
// at the same time.
Logger *log.Logger
// SnapshotPath if provided is used to snapshot live nodes as well
// as lamport clock values. When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
@ -230,17 +262,20 @@ func DefaultConfig() *Config {
return &Config{
NodeName: hostname,
BroadcastTimeout: 5 * time.Second,
LeavePropagateDelay: 1 * time.Second,
EventBuffer: 512,
QueryBuffer: 512,
LogOutput: os.Stderr,
ProtocolVersion: ProtocolVersionMax,
ProtocolVersion: 4,
ReapInterval: 15 * time.Second,
RecentIntentBuffer: 128,
RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour,
QueueCheckInterval: 30 * time.Second,
QueueDepthWarning: 128,
MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour,
FlapTimeout: 60 * time.Second,
MemberlistConfig: memberlist.DefaultLANConfig(),
QueryTimeoutMult: 16,
QueryResponseSizeLimit: 1024,

View file

@ -1,9 +1,12 @@
package serf
import (
"bytes"
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)
// delegate is the memberlist.Delegate implementation that Serf uses.
@ -11,6 +14,8 @@ type delegate struct {
serf *Serf
}
var _ memberlist.Delegate = &delegate{}
func (d *delegate) NodeMeta(limit int) []byte {
roleBytes := d.serf.encodeTags(d.serf.config.Tags)
if len(roleBytes) > limit {
@ -83,6 +88,25 @@ func (d *delegate) NotifyMsg(buf []byte) {
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
d.serf.handleQueryResponse(&resp)
case messageRelayType:
var header relayHeader
var handle codec.MsgpackHandle
reader := bytes.NewReader(buf[1:])
decoder := codec.NewDecoder(reader, &handle)
if err := decoder.Decode(&header); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
break
}
// The remaining contents are the message itself, so forward that
raw := make([]byte, reader.Len())
reader.Read(raw)
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil {
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
break
}
default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
}
@ -202,13 +226,16 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
d.serf.queryClock.Witness(pp.QueryLTime - 1)
}
// Process the left nodes first to avoid the LTimes from being increment
// in the wrong order
// Process the left nodes first to avoid the LTimes from incrementing
// in the wrong order. Note that we don't have the actual Lamport time
// for the leave message, so we go one past the join time, since the
// leave must have been accepted after that to get onto the left members
// list. If we didn't do this then the message would not get processed.
leftMap := make(map[string]struct{}, len(pp.LeftMembers))
leave := messageLeave{}
for _, name := range pp.LeftMembers {
leftMap[name] = struct{}{}
leave.LTime = pp.StatusLTimes[name]
leave.LTime = pp.StatusLTimes[name] + 1
leave.Node = name
d.serf.handleNodeLeaveIntent(&leave)
}
@ -230,7 +257,8 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
// If we are doing a join, and eventJoinIgnore is set
// then we set the eventMinTime to the EventLTime. This
// prevents any of the incoming events from being processed
if isJoin && d.serf.eventJoinIgnore {
eventJoinIgnore := d.serf.eventJoinIgnore.Load().(bool)
if isJoin && eventJoinIgnore {
d.serf.eventLock.Lock()
if pp.EventLTime > d.serf.eventMinTime {
d.serf.eventMinTime = pp.EventLTime

View file

@ -95,18 +95,19 @@ func (u UserEvent) String() string {
return fmt.Sprintf("user-event: %s", u.Name)
}
// Query is the struct used EventQuery type events
// Query is the struct used by EventQuery type events
type Query struct {
LTime LamportTime
Name string
Payload []byte
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
respLock sync.Mutex
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
relayFactor uint8 // Number of duplicate responses to relay back to sender
respLock sync.Mutex
}
func (q *Query) EventType() EventType {
@ -122,47 +123,74 @@ func (q *Query) Deadline() time.Time {
return q.deadline
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
q.respLock.Lock()
defer q.respLock.Unlock()
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("Response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("Response is past the deadline")
}
func (q *Query) createResponse(buf []byte) messageQueryResponse {
// Create response
resp := messageQueryResponse{
return messageQueryResponse{
LTime: q.LTime,
ID: q.id,
From: q.serf.config.NodeName,
Payload: buf,
}
}
// Format the response
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
// Check response size
func (q *Query) checkResponseSize(resp []byte) error {
if len(resp) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
return nil
}
// Send the response
func (q *Query) respondWithMessageAndResponse(raw []byte, resp messageQueryResponse) error {
// Check the size limit
if err := q.checkResponseSize(raw); err != nil {
return err
}
q.respLock.Lock()
defer q.respLock.Unlock()
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("response is past the deadline")
}
// Send the response directly to the originator
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
// Clera the deadline, response sent
// Relay the response through up to relayFactor other nodes
if err := q.serf.relayResponse(q.relayFactor, addr, &resp); err != nil {
return err
}
// Clear the deadline, responses sent
q.deadline = time.Time{}
return nil
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
// Create response
resp := q.createResponse(buf)
// Encode response
raw, err := encodeMessage(messageQueryResponseType, resp)
if err != nil {
return fmt.Errorf("failed to format response: %v", err)
}
if err := q.respondWithMessageAndResponse(raw, resp); err != nil {
return fmt.Errorf("failed to respond to key query: %v", err)
}
return nil
}

View file

@ -2,6 +2,7 @@ package serf
import (
"encoding/base64"
"fmt"
"log"
"strings"
)
@ -28,6 +29,13 @@ const (
// listKeysQuery is used to list all known keys in the cluster
listKeysQuery = "list-keys"
// minEncodedKeyLength is used to compute the max number of keys in a list key
// response. eg 1024/25 = 40. a message with max size of 1024 bytes cannot
// contain more than 40 keys. There is a test
// (TestSerfQueries_estimateMaxKeysInListKeyResponse) which does the
// computation and in case of changes, the value can be adjusted.
minEncodedKeyLength = 25
)
// internalQueryName is used to generate a query name for an internal query
@ -149,17 +157,62 @@ func (s *serfQueries) handleConflict(q *Query) {
}
}
func (s *serfQueries) keyListResponseWithCorrectSize(q *Query, resp *nodeKeyResponse) ([]byte, messageQueryResponse, error) {
maxListKeys := q.serf.config.QueryResponseSizeLimit / minEncodedKeyLength
actual := len(resp.Keys)
for i := maxListKeys; i >= 0; i-- {
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
return nil, messageQueryResponse{}, err
}
// Create response
qresp := q.createResponse(buf)
// Encode response
raw, err := encodeMessage(messageQueryResponseType, qresp)
if err != nil {
return nil, messageQueryResponse{}, err
}
// Check the size limit
if err = q.checkResponseSize(raw); err != nil {
resp.Keys = resp.Keys[0:i]
resp.Message = fmt.Sprintf("truncated key list response, showing first %d of %d keys", i, actual)
continue
}
if actual > i {
s.logger.Printf("[WARN] serf: %s", resp.Message)
}
return raw, qresp, nil
}
return nil, messageQueryResponse{}, fmt.Errorf("Failed to truncate response so that it fits into message")
}
// sendKeyResponse handles responding to key-related queries.
func (s *serfQueries) sendKeyResponse(q *Query, resp *nodeKeyResponse) {
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err)
return
}
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
switch q.Name {
case internalQueryName(listKeysQuery):
raw, qresp, err := s.keyListResponseWithCorrectSize(q, resp)
if err != nil {
s.logger.Printf("[ERR] serf: %v", err)
return
}
if err := q.respondWithMessageAndResponse(raw, qresp); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
}
default:
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err)
return
}
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
}
}
}
@ -192,10 +245,12 @@ func (s *serfQueries) handleInstallKey(q *Query) {
goto SEND
}
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
if s.serf.config.KeyringFile != "" {
if err := s.serf.writeKeyringFile(); err != nil {
response.Message = err.Error()
s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err)
goto SEND
}
}
response.Result = true

View file

@ -33,6 +33,13 @@ type KeyResponse struct {
Keys map[string]int
}
// KeyRequestOptions is used to contain optional parameters for a keyring operation
type KeyRequestOptions struct {
// RelayFactor is the number of duplicate query responses to send by relaying through
// other nodes, for redundancy
RelayFactor uint8
}
// streamKeyResp takes care of reading responses from a channel and composing
// them into a KeyResponse. It will update a KeyResponse *in place* and
// therefore has nothing to return.
@ -61,6 +68,11 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
resp.NumErr++
}
if nodeResponse.Result && len(nodeResponse.Message) > 0 {
resp.Messages[r.From] = nodeResponse.Message
k.serf.logger.Println("[WARN] serf:", nodeResponse.Message)
}
// Currently only used for key list queries, this adds keys to a counter
// and increments them for each node response which contains them.
for _, key := range nodeResponse.Keys {
@ -83,7 +95,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
// handleKeyRequest performs query broadcasting to all members for any type of
// key operation and manages gathering responses and packing them up into a
// KeyResponse for uniform response handling.
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
resp := &KeyResponse{
Messages: make(map[string]string),
Keys: make(map[string]int),
@ -103,6 +115,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
}
qParam := k.serf.DefaultQueryParams()
if opts != nil {
qParam.RelayFactor = opts.RelayFactor
}
queryResp, err := k.serf.Query(qName, req, qParam)
if err != nil {
return resp, err
@ -127,30 +142,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
// responses from each of them, returning a list of messages from each node
// and any applicable error conditions.
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
return k.InstallKeyWithOptions(key, nil)
}
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, installKeyQuery)
return k.handleKeyRequest(key, installKeyQuery, opts)
}
// UseKey handles broadcasting a primary key change to all members in the
// cluster, and gathering any response messages. If successful, there should
// be an empty KeyResponse returned.
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
return k.UseKeyWithOptions(key, nil)
}
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, useKeyQuery)
return k.handleKeyRequest(key, useKeyQuery, opts)
}
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
// will receive this event, and if they have the key in their keyring, remove
// it. If any errors are encountered, RemoveKey will collect and relay them.
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
return k.RemoveKeyWithOptions(key, nil)
}
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.Lock()
defer k.l.Unlock()
return k.handleKeyRequest(key, removeKeyQuery)
return k.handleKeyRequest(key, removeKeyQuery, opts)
}
// ListKeys is used to collect installed keys from members in a Serf cluster
@ -159,8 +186,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
// Since having multiple keys installed can cause performance penalties in some
// cases, it's important to verify this information and remove unneeded keys.
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
return k.ListKeysWithOptions(nil)
}
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
k.l.RLock()
defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery)
return k.handleKeyRequest("", listKeysQuery, opts)
}

View file

@ -2,8 +2,10 @@ package serf
import (
"bytes"
"github.com/hashicorp/go-msgpack/codec"
"net"
"time"
"github.com/hashicorp/go-msgpack/codec"
)
// messageType are the types of gossip messages Serf will send along
@ -20,6 +22,7 @@ const (
messageConflictResponseType
messageKeyRequestType
messageKeyResponseType
messageRelayType
)
const (
@ -75,15 +78,16 @@ type messageUserEvent struct {
// messageQuery is used for query events
type messageQuery struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
RelayFactor uint8 // Used to set the number of duplicate relayed responses
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
}
// Ack checks if the ack flag is set
@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
return buf.Bytes(), err
}
// relayHeader is used to store the end destination of a relayed message
type relayHeader struct {
DestAddr net.UDPAddr
}
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
// address of the end recipient to the front of the message
func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
buf.WriteByte(uint8(messageRelayType))
if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil {
return nil, err
}
buf.WriteByte(uint8(t))
err := encoder.Encode(msg)
return buf.Bytes(), err
}
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(f))

View file

@ -2,7 +2,6 @@ package serf
import (
"bytes"
"log"
"time"
"github.com/armon/go-metrics"
@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte {
// The rest of the message is the serialized coordinate.
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
}
return buf.Bytes()
}
@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
// Verify ping version in the header.
version := payload[0]
if version != PingVersion {
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version)
return
}
@ -61,29 +60,31 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
var coord coordinate.Coordinate
if err := dec.Decode(&coord); err != nil {
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
return
}
// Apply the update. Since this is a coordinate coming from some place
// else we harden this and look for dimensionality problems proactively.
// Apply the update.
before := p.serf.coordClient.GetCoordinate()
if before.IsCompatibleWith(&coord) {
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
} else {
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1)
p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
}
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
}

View file

@ -1,7 +1,11 @@
package serf
import (
"errors"
"fmt"
"math"
"math/rand"
"net"
"regexp"
"sync"
"time"
@ -24,6 +28,10 @@ type QueryParam struct {
// send an ack.
RequestAck bool
// RelayFactor controls the number of duplicate responses to relay
// back to the sender through other nodes for redundancy.
RelayFactor uint8
// The timeout limits how long the query is left open. If not provided,
// then a default timeout is used based on the configuration of Serf
Timeout time.Duration
@ -93,6 +101,10 @@ type QueryResponse struct {
// respCh is used to send a response from a node
respCh chan NodeResponse
// acks/responses are used to track the nodes that have sent an ack/response
acks map[string]struct{}
responses map[string]struct{}
closed bool
closeLock sync.Mutex
}
@ -100,13 +112,15 @@ type QueryResponse struct {
// newQueryResponse is used to construct a new query response
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
resp := &QueryResponse{
deadline: time.Now().Add(q.Timeout),
id: q.ID,
lTime: q.LTime,
respCh: make(chan NodeResponse, n),
deadline: time.Now().Add(q.Timeout),
id: q.ID,
lTime: q.LTime,
respCh: make(chan NodeResponse, n),
responses: make(map[string]struct{}),
}
if q.Ack() {
resp.ackCh = make(chan string, n)
resp.acks = make(map[string]struct{})
}
return resp
}
@ -135,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time {
// Finished returns if the query is finished running
func (r *QueryResponse) Finished() bool {
r.closeLock.Lock()
defer r.closeLock.Unlock()
return r.closed || time.Now().After(r.deadline)
}
@ -151,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
return r.respCh
}
// sendResponse sends a response on the response channel ensuring the channel is not closed.
func (r *QueryResponse) sendResponse(nr NodeResponse) error {
r.closeLock.Lock()
defer r.closeLock.Unlock()
if r.closed {
return nil
}
select {
case r.respCh <- nr:
r.responses[nr.From] = struct{}{}
default:
return errors.New("serf: Failed to deliver query response, dropping")
}
return nil
}
// NodeResponse is used to represent a single response from a node
type NodeResponse struct {
From string
@ -208,3 +240,74 @@ func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
}
return true
}
// relayResponse will relay a copy of the given response to up to relayFactor
// other members.
func (s *Serf) relayResponse(relayFactor uint8, addr net.UDPAddr, resp *messageQueryResponse) error {
if relayFactor == 0 {
return nil
}
// Needs to be worth it; we need to have at least relayFactor *other*
// nodes. If you have a tiny cluster then the relayFactor shouldn't
// be needed.
members := s.Members()
if len(members) < int(relayFactor)+1 {
return nil
}
// Prep the relay message, which is a wrapped version of the original.
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
if err != nil {
return fmt.Errorf("failed to format relayed response: %v", err)
}
if len(raw) > s.config.QueryResponseSizeLimit {
return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit)
}
// Relay to a random set of peers.
localName := s.LocalMember().Name
relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool {
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName
})
for _, m := range relayMembers {
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
if err := s.memberlist.SendTo(&relayAddr, raw); err != nil {
return fmt.Errorf("failed to send relay response: %v", err)
}
}
return nil
}
// kRandomMembers selects up to k members from a given list, optionally
// filtering by the given filterFunc
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
n := len(members)
kMembers := make([]Member, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kMembers) < k; i++ {
// Get random member
idx := rand.Intn(n)
member := members[idx]
// Give the filter a shot at it.
if filterFunc != nil && filterFunc(member) {
continue OUTER
}
// Check if we have this member already
for j := 0; j < len(kMembers); j++ {
if member.Name == kMembers[j].Name {
continue OUTER
}
}
// Append the member
kMembers = append(kMembers, member)
}
return kMembers
}

View file

@ -10,8 +10,10 @@ import (
"log"
"math/rand"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
@ -25,7 +27,7 @@ import (
// version to memberlist below.
const (
ProtocolVersionMin uint8 = 2
ProtocolVersionMax = 4
ProtocolVersionMax = 5
)
const (
@ -65,16 +67,15 @@ type Serf struct {
memberLock sync.RWMutex
members map[string]*memberState
// Circular buffers for recent intents, used
// in case we get the intent before the relevant event
recentLeave []nodeIntent
recentLeaveIndex int
recentJoin []nodeIntent
recentJoinIndex int
// recentIntents the lamport time and type of intent for a given node in
// case we get an intent before the relevant memberlist event. This is
// indexed by node, and always store the latest lamport time / intent
// we've seen. The memberLock protects this structure.
recentIntents map[string]nodeIntent
eventBroadcasts *memberlist.TransmitLimitedQueue
eventBuffer []*userEvents
eventJoinIgnore bool
eventJoinIgnore atomic.Value
eventMinTime LamportTime
eventLock sync.RWMutex
@ -179,10 +180,18 @@ type memberState struct {
leaveTime time.Time // wall clock time of leave
}
// nodeIntent is used to buffer intents for out-of-order deliveries
// nodeIntent is used to buffer intents for out-of-order deliveries.
type nodeIntent struct {
// Type is the intent being tracked. Only messageJoinType and
// messageLeaveType are tracked.
Type messageType
// WallTime is the wall clock time we saw this intent in order to
// expire it from the buffer.
WallTime time.Time
// LTime is the Lamport time, used for cluster-wide ordering of events.
LTime LamportTime
Node string
}
// userEvent is used to buffer events to prevent re-delivery
@ -233,14 +242,24 @@ func Create(conf *Config) (*Serf, error) {
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
logger := conf.Logger
if logger == nil {
logOutput := conf.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
logger = log.New(logOutput, "", log.LstdFlags)
}
serf := &Serf{
config: conf,
logger: log.New(conf.LogOutput, "", log.LstdFlags),
logger: logger,
members: make(map[string]*memberState),
queryResponse: make(map[LamportTime]*QueryResponse),
shutdownCh: make(chan struct{}),
state: SerfAlive,
}
serf.eventJoinIgnore.Store(false)
// Check that the meta data length is okay
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
@ -295,7 +314,6 @@ func Create(conf *Config) (*Serf, error) {
conf.RejoinAfterLeave,
serf.logger,
&serf.clock,
serf.coordClient,
conf.EventCh,
serf.shutdownCh)
if err != nil {
@ -321,27 +339,20 @@ func Create(conf *Config) (*Serf, error) {
// Setup the various broadcast queues, which we use to send our own
// custom broadcasts along the gossip channel.
serf.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
// Create the buffer for recent intents
serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer)
serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer)
serf.recentIntents = make(map[string]nodeIntent)
// Create a buffer for events and queries
serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
@ -493,15 +504,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
// Create a message
q := messageQuery{
LTime: s.queryClock.Time(),
ID: uint32(rand.Int31()),
Addr: local.Addr,
Port: local.Port,
Filters: filters,
Flags: flags,
Timeout: params.Timeout,
Name: name,
Payload: payload,
LTime: s.queryClock.Time(),
ID: uint32(rand.Int31()),
Addr: local.Addr,
Port: local.Port,
Filters: filters,
Flags: flags,
RelayFactor: params.RelayFactor,
Timeout: params.Timeout,
Name: name,
Payload: payload,
}
// Encode the query
@ -582,9 +594,9 @@ func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
// Ignore any events from a potential join. This is safe since we hold
// the joinLock and nobody else can be doing a Join
if ignoreOld {
s.eventJoinIgnore = true
s.eventJoinIgnore.Store(true)
defer func() {
s.eventJoinIgnore = false
s.eventJoinIgnore.Store(false)
}()
}
@ -679,6 +691,13 @@ func (s *Serf) Leave() error {
return err
}
// Wait for the leave to propagate through the cluster. The broadcast
// timeout is how long we wait for the message to go out from our own
// queue, but this wait is for that message to propagate through the
// cluster. In particular, we want to stay up long enough to service
// any probes from other nodes before they learn about us leaving.
time.Sleep(s.config.LeavePropagateDelay)
// Transition to Left only if we not already shutdown
s.stateLock.Lock()
if s.state != SerfShutdown {
@ -785,13 +804,15 @@ func (s *Serf) Shutdown() error {
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
}
// Wait to close the shutdown channel until after we've shut down the
// memberlist and its associated network resources, since the shutdown
// channel signals that we are cleaned up outside of Serf.
s.state = SerfShutdown
close(s.shutdownCh)
err := s.memberlist.Shutdown()
if err != nil {
return err
}
close(s.shutdownCh)
// Wait for the snapshoter to finish if we have one
if s.snapshotter != nil {
@ -855,22 +876,25 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
},
}
// Check if we have a join intent and use the LTime
if join := recentIntent(s.recentJoin, n.Name); join != nil {
member.statusLTime = join.LTime
// Check if we have a join or leave intent. The intent buffer
// will only hold one event for this node, so the more recent
// one will take effect.
if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok {
member.statusLTime = join
}
// Check if we have a leave intent
if leave := recentIntent(s.recentLeave, n.Name); leave != nil {
if leave.LTime > member.statusLTime {
member.Status = StatusLeaving
member.statusLTime = leave.LTime
}
if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok {
member.Status = StatusLeaving
member.statusLTime = leave
}
s.members[n.Name] = member
} else {
oldStatus = member.Status
deadTime := time.Now().Sub(member.leaveTime)
if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout {
metrics.IncrCounter([]string{"serf", "member", "flap"}, 1)
}
member.Status = StatusAlive
member.leaveTime = time.Time{}
member.Addr = net.IP(n.Addr)
@ -1011,18 +1035,8 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
member, ok := s.members[leaveMsg.Node]
if !ok {
// If we've already seen this message don't rebroadcast
if recentIntent(s.recentLeave, leaveMsg.Node) != nil {
return false
}
// We don't know this member so store it in a buffer for now
s.recentLeave[s.recentLeaveIndex] = nodeIntent{
LTime: leaveMsg.LTime,
Node: leaveMsg.Node,
}
s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave)
return true
// Rebroadcast only if this was an update we hadn't seen before.
return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now)
}
// If the message is old, then it is irrelevant and we can skip it
@ -1082,15 +1096,8 @@ func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
member, ok := s.members[joinMsg.Node]
if !ok {
// If we've already seen this message don't rebroadcast
if recentIntent(s.recentJoin, joinMsg.Node) != nil {
return false
}
// We don't know this member so store it in a buffer for now
s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node}
s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin)
return true
// Rebroadcast only if this was an update we hadn't seen before.
return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now)
}
// Check if this time is newer than what we have
@ -1245,19 +1252,23 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
if err := s.memberlist.SendTo(&addr, raw); err != nil {
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
}
if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
}
}
}
if s.config.EventCh != nil {
s.config.EventCh <- &Query{
LTime: query.LTime,
Name: query.Name,
Payload: query.Payload,
serf: s,
id: query.ID,
addr: query.Addr,
port: query.Port,
deadline: time.Now().Add(query.Timeout),
LTime: query.LTime,
Name: query.Name,
Payload: query.Payload,
serf: s,
id: query.ID,
addr: query.Addr,
port: query.Port,
deadline: time.Now().Add(query.Timeout),
relayFactor: query.RelayFactor,
}
}
return rebroadcast
@ -1290,25 +1301,37 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
// Process each type of response
if resp.Ack() {
// Exit early if this is a duplicate ack
if _, ok := query.acks[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
return
}
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
select {
case query.ackCh <- resp.From:
query.acks[resp.From] = struct{}{}
default:
s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
}
} else {
// Exit early if this is a duplicate response
if _, ok := query.responses[resp.From]; ok {
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
return
}
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
select {
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
default:
s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
if err != nil {
s.logger.Printf("[WARN] %v", err)
}
}
}
// handleNodeConflict is invoked when a join detects a conflict over a name.
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
// will reject the "new" node mapping, but we can still be notified
// will reject the "new" node mapping, but we can still be notified.
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
// Log a basic warning if the node is not us...
if existing.Name != s.config.NodeName {
@ -1361,7 +1384,7 @@ func (s *Serf) resolveNodeConflict() {
// Update the counters
responses++
if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
matching++
}
}
@ -1382,14 +1405,17 @@ func (s *Serf) resolveNodeConflict() {
}
}
// handleReap periodically reaps the list of failed and left members.
// handleReap periodically reaps the list of failed and left members, as well
// as old buffered intents.
func (s *Serf) handleReap() {
for {
select {
case <-time.After(s.config.ReapInterval):
s.memberLock.Lock()
s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout)
s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout)
now := time.Now()
s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout)
s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout)
reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout)
s.memberLock.Unlock()
case <-s.shutdownCh:
return
@ -1413,8 +1439,7 @@ func (s *Serf) handleReconnect() {
// reap is called with a list of old members and a timeout, and removes
// members that have exceeded the timeout. The members are removed from
// both the old list and the members itself. Locking is left to the caller.
func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState {
now := time.Now()
func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState {
n := len(old)
for i := 0; i < n; i++ {
m := old[i]
@ -1485,7 +1510,7 @@ func (s *Serf) reconnect() {
}
// Select a random member to try and join
idx := int(rand.Uint32() % uint32(n))
idx := rand.Int31n(int32(n))
mem := s.failedMembers[idx]
s.memberLock.RUnlock()
@ -1497,21 +1522,37 @@ func (s *Serf) reconnect() {
s.memberlist.Join([]string{addr.String()})
}
// getQueueMax will get the maximum queue depth, which might be dynamic depending
// on how Serf is configured.
func (s *Serf) getQueueMax() int {
max := s.config.MaxQueueDepth
if s.config.MinQueueDepth > 0 {
s.memberLock.RLock()
max = 2 * len(s.members)
s.memberLock.RUnlock()
if max < s.config.MinQueueDepth {
max = s.config.MinQueueDepth
}
}
return max
}
// checkQueueDepth periodically checks the size of a queue to see if
// it is too large
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
for {
select {
case <-time.After(time.Second):
case <-time.After(s.config.QueueCheckInterval):
numq := queue.NumQueued()
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
if numq >= s.config.QueueDepthWarning {
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
}
if numq > s.config.MaxQueueDepth {
if max := s.getQueueMax(); numq > max {
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
name, numq, s.config.MaxQueueDepth)
queue.Prune(s.config.MaxQueueDepth)
name, numq, max)
queue.Prune(max)
}
case <-s.shutdownCh:
return
@ -1533,24 +1574,46 @@ func removeOldMember(old []*memberState, name string) []*memberState {
return old
}
// recentIntent checks the recent intent buffer for a matching
// entry for a given node, and either returns the message or nil
func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) {
for i := 0; i < len(recent); i++ {
// Break fast if we hit a zero entry
if recent[i].LTime == 0 {
break
}
// Check for a node match
if recent[i].Node == node {
// Take the most recent entry
if intent == nil || recent[i].LTime > intent.LTime {
intent = &recent[i]
}
// reapIntents clears out any intents that are older than the timeout. Make sure
// the memberLock is held when passing in the Serf instance's recentIntents
// member.
func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) {
for node, intent := range intents {
if now.Sub(intent.WallTime) > timeout {
delete(intents, node)
}
}
return
}
// upsertIntent will update an existing intent with the supplied Lamport time,
// or create a new entry. This will return true if a new entry was added. The
// stamper is used to capture the wall clock time for expiring these buffered
// intents. Make sure the memberLock is held when passing in the Serf instance's
// recentIntents member.
func upsertIntent(intents map[string]nodeIntent, node string, itype messageType,
ltime LamportTime, stamper func() time.Time) bool {
if intent, ok := intents[node]; !ok || ltime > intent.LTime {
intents[node] = nodeIntent{
Type: itype,
WallTime: stamper(),
LTime: ltime,
}
return true
}
return false
}
// recentIntent checks the recent intent buffer for a matching entry for a given
// node, and returns the Lamport time, if an intent is present, indicated by the
// returned boolean. Make sure the memberLock is held for read when passing in
// the Serf instance's recentIntents member.
func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) {
if intent, ok := intents[node]; ok && intent.Type == itype {
return intent.LTime, true
}
return LamportTime(0), false
}
// handleRejoin attempts to reconnect to previously known alive nodes
@ -1613,10 +1676,18 @@ func (s *Serf) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
s.memberLock.RLock()
members := toString(uint64(len(s.members)))
failed := toString(uint64(len(s.failedMembers)))
left := toString(uint64(len(s.leftMembers)))
health_score := toString(uint64(s.memberlist.GetHealthScore()))
s.memberLock.RUnlock()
stats := map[string]string{
"members": toString(uint64(len(s.members))),
"failed": toString(uint64(len(s.failedMembers))),
"left": toString(uint64(len(s.leftMembers))),
"members": members,
"failed": failed,
"left": left,
"health_score": health_score,
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),
@ -1625,6 +1696,9 @@ func (s *Serf) Stats() map[string]string {
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
}
if !s.config.DisableCoordinates {
stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets))
}
return stats
}

View file

@ -2,7 +2,6 @@ package serf
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
@ -13,7 +12,6 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/serf/coordinate"
)
/*
@ -27,34 +25,59 @@ nodes to re-join, as well as restore our clock values to avoid replaying
old events.
*/
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
const (
// flushInterval is how often we force a flush of the snapshot file
flushInterval = 500 * time.Millisecond
// clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file
clockUpdateInterval = 500 * time.Millisecond
// tmpExt is the extention we use for the temporary file during compaction
tmpExt = ".compact"
// snapshotErrorRecoveryInterval is how often we attempt to recover from
// errors writing to the snapshot file.
snapshotErrorRecoveryInterval = 30 * time.Second
// eventChSize is the size of the event buffers between Serf and the
// consuming application. If this is exhausted we will block Serf and Memberlist.
eventChSize = 2048
// shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown
shutdownFlushTimeout = 250 * time.Millisecond
// snapshotBytesPerNode is an estimated bytes per node to snapshot
snapshotBytesPerNode = 128
// snapshotCompactionThreshold is the threshold we apply to
// the snapshot size estimate (nodes * bytes per node) before compacting.
snapshotCompactionThreshold = 2
)
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
type Snapshotter struct {
aliveNodes map[string]string
clock *LamportClock
coordClient *coordinate.Client
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
aliveNodes map[string]string
clock *LamportClock
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
streamCh chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
minCompactSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
lastAttemptedCompaction time.Time
}
// PreviousNode is used to represent the previously known alive nodes
@ -74,17 +97,17 @@ func (p PreviousNode) String() string {
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
// if you intend to rejoin the same cluster after a leave.
func NewSnapshotter(path string,
maxSize int,
minCompactSize int,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
coordClient *coordinate.Client,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
inCh := make(chan Event, eventChSize)
streamCh := make(chan Event, eventChSize)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
}
@ -101,16 +124,16 @@ func NewSnapshotter(path string,
snap := &Snapshotter{
aliveNodes: make(map[string]string),
clock: clock,
coordClient: coordClient,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
streamCh: streamCh,
lastClock: 0,
lastEventClock: 0,
lastQueryClock: 0,
leaveCh: make(chan struct{}),
logger: logger,
maxSize: int64(maxSize),
minCompactSize: int64(minCompactSize),
path: path,
offset: offset,
outCh: outCh,
@ -126,6 +149,7 @@ func NewSnapshotter(path string,
}
// Start handling new commands
go snap.teeStream()
go snap.stream()
return inCh, snap, nil
}
@ -175,13 +199,68 @@ func (s *Snapshotter) Leave() {
}
}
// teeStream is a long running routine that is used to copy events
// to the output channel and the internal event handler.
func (s *Snapshotter) teeStream() {
flushEvent := func(e Event) {
// Forward to the internal stream, do not block
select {
case s.streamCh <- e:
default:
}
// Forward the event immediately, do not block
if s.outCh != nil {
select {
case s.outCh <- e:
default:
}
}
}
OUTER:
for {
select {
case e := <-s.inCh:
flushEvent(e)
case <-s.shutdownCh:
break OUTER
}
}
// Drain any remaining events before exiting
for {
select {
case e := <-s.inCh:
flushEvent(e)
default:
return
}
}
}
// stream is a long running routine that is used to handle events
func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
coordinateTicker := time.NewTicker(coordinateUpdateInterval)
defer coordinateTicker.Stop()
// flushEvent is used to handle writing out an event
flushEvent := func(e Event) {
// Stop recording events after a leave is issued
if s.leaving {
return
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
}
for {
select {
@ -200,34 +279,32 @@ func (s *Snapshotter) stream() {
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
}
case e := <-s.inCh:
// Forward the event immediately
if s.outCh != nil {
s.outCh <- e
}
// Stop recording events after a leave is issued
if s.leaving {
continue
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
case e := <-s.streamCh:
flushEvent(e)
case <-clockTicker.C:
s.updateClock()
case <-coordinateTicker.C:
s.updateCoordinate()
case <-s.shutdownCh:
// Setup a timeout
flushTimeout := time.After(shutdownFlushTimeout)
// Snapshot the clock
s.updateClock()
// Clear out the buffers
FLUSH:
for {
select {
case e := <-s.streamCh:
flushEvent(e)
case <-flushTimeout:
break FLUSH
default:
break FLUSH
}
}
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
}
@ -273,20 +350,6 @@ func (s *Snapshotter) updateClock() {
}
}
// updateCoordinate is called periodically to write out the current local
// coordinate. It's safe to call this if coordinates aren't enabled (nil
// client) and it will be a no-op.
func (s *Snapshotter) updateCoordinate() {
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
} else {
s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
}
}
}
// processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks
@ -311,6 +374,17 @@ func (s *Snapshotter) processQuery(q *Query) {
func (s *Snapshotter) tryAppend(l string) {
if err := s.appendLine(l); err != nil {
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
now := time.Now()
if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval {
s.lastAttemptedCompaction = now
s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...")
err = s.compact()
if err != nil {
s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err)
} else {
s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state")
}
}
}
}
@ -334,12 +408,25 @@ func (s *Snapshotter) appendLine(l string) error {
// Check if a compaction is necessary
s.offset += int64(n)
if s.offset > s.maxSize {
if s.offset > s.snapshotMaxSize() {
return s.compact()
}
return nil
}
// snapshotMaxSize computes the maximum size and is used to force periodic compaction.
func (s *Snapshotter) snapshotMaxSize() int64 {
nodes := int64(len(s.aliveNodes))
estSize := nodes * snapshotBytesPerNode
threshold := estSize * snapshotCompactionThreshold
// Apply a minimum threshold to avoid frequent compaction
if threshold < s.minCompactSize {
threshold = s.minCompactSize
}
return threshold
}
// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
@ -391,30 +478,22 @@ func (s *Snapshotter) compact() error {
}
offset += int64(n)
// Write out the coordinate.
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
fh.Close()
return err
}
line = fmt.Sprintf("coordinate: %s\n", encoded)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Flush the new snapshot
err = buf.Flush()
fh.Close()
if err != nil {
return fmt.Errorf("failed to flush new snapshot: %v", err)
}
err = fh.Sync()
if err != nil {
fh.Close()
return fmt.Errorf("failed to fsync new snapshot: %v", err)
}
fh.Close()
// We now need to swap the old snapshot file with the new snapshot.
// Turns out, Windows won't let us rename the files if we have
// open handles to them or if the destination already exists. This
@ -520,19 +599,7 @@ func (s *Snapshotter) replay() error {
s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") {
if s.coordClient == nil {
s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
continue
}
coordStr := strings.TrimPrefix(line, "coordinate: ")
var coord coordinate.Coordinate
err := json.Unmarshal([]byte(coordStr), &coord)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
s.coordClient.SetCoordinate(&coord)
continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {

10
vendor/modules.txt vendored
View file

@ -417,7 +417,7 @@ github.com/hashicorp/go-immutable-radix
# github.com/hashicorp/go-memdb v1.3.2
## explicit; go 1.12
github.com/hashicorp/go-memdb
# github.com/hashicorp/go-msgpack v0.5.3 => github.com/hashicorp/go-msgpack v0.0.0-20140221154404-71c2886f5a67
# github.com/hashicorp/go-msgpack v0.5.3
## explicit
github.com/hashicorp/go-msgpack/codec
# github.com/hashicorp/go-multierror v1.1.1
@ -432,7 +432,7 @@ github.com/hashicorp/golang-lru/simplelru
# github.com/hashicorp/memberlist v0.2.4
## explicit; go 1.12
github.com/hashicorp/memberlist
# github.com/hashicorp/serf v0.8.2 => github.com/hashicorp/serf v0.7.1-0.20160317193612-598c54895cc5
# github.com/hashicorp/serf v0.8.2
## explicit
github.com/hashicorp/serf/coordinate
github.com/hashicorp/serf/serf
@ -775,8 +775,6 @@ github.com/vishvananda/netlink/nl
# github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f
## explicit; go 1.12
github.com/vishvananda/netns
# github.com/vmihailenco/msgpack v4.0.4+incompatible
## explicit
# go.etcd.io/bbolt v1.3.6
## explicit; go 1.12
go.etcd.io/bbolt
@ -1114,10 +1112,6 @@ gotest.tools/v3/internal/format
gotest.tools/v3/internal/source
gotest.tools/v3/poll
gotest.tools/v3/skip
# labix.org/v2/mgo v0.0.0-20140701140051-000000000287
## explicit
# github.com/armon/go-radix => github.com/armon/go-radix v0.0.0-20150105235045-e39d623f12e8
# github.com/hashicorp/go-msgpack => github.com/hashicorp/go-msgpack v0.0.0-20140221154404-71c2886f5a67
# github.com/hashicorp/serf => github.com/hashicorp/serf v0.7.1-0.20160317193612-598c54895cc5
# github.com/rexray/gocsi => github.com/dperny/gocsi v1.2.3-pre
# github.com/google/certificate-transparency-go => github.com/google/certificate-transparency-go v1.0.20