Merge pull request #46752 from corhere/libn/diagnostics-handler-ctx
libnetwork/diagnostic: clean up Server type
This commit is contained in:
commit
3b4207896d
6 changed files with 201 additions and 268 deletions
|
@ -332,7 +332,7 @@ func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
}
|
||||
|
||||
// Register the diagnostic handlers
|
||||
c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
|
||||
nDB.RegisterDiagnosticHandlers(c.DiagnosticServer)
|
||||
|
||||
var cancelList []func()
|
||||
ch, cancel := nDB.Watch(libnetworkEPTable, "")
|
||||
|
|
|
@ -21,11 +21,7 @@ var (
|
|||
ipAddr string
|
||||
)
|
||||
|
||||
var testerPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
|
||||
"/myip": ipaddress,
|
||||
}
|
||||
|
||||
func ipaddress(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func ipaddress(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "%s\n", ipAddr)
|
||||
}
|
||||
|
||||
|
@ -52,7 +48,6 @@ func Server(args []string) {
|
|||
log.G(context.TODO()).Infof("%s uses IP %s\n", localNodeName, ipAddr)
|
||||
|
||||
server = diagnostic.New()
|
||||
server.Init()
|
||||
conf := networkdb.DefaultConfig()
|
||||
conf.Hostname = localNodeName
|
||||
conf.AdvertiseAddr = ipAddr
|
||||
|
@ -64,9 +59,9 @@ func Server(args []string) {
|
|||
}
|
||||
|
||||
// Register network db handlers
|
||||
server.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
|
||||
server.RegisterHandler(nil, testerPaths2Func)
|
||||
server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
|
||||
nDB.RegisterDiagnosticHandlers(server)
|
||||
server.HandleFunc("/myip", ipaddress)
|
||||
dummyclient.RegisterDiagnosticHandlers(server, nDB)
|
||||
server.EnableDiagnostic("", port)
|
||||
// block here
|
||||
select {}
|
||||
|
|
|
@ -11,10 +11,13 @@ import (
|
|||
events "github.com/docker/go-events"
|
||||
)
|
||||
|
||||
// DummyClientPaths2Func exported paths for the client
|
||||
var DummyClientPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
|
||||
"/watchtable": watchTable,
|
||||
"/watchedtableentries": watchTableEntries,
|
||||
type Mux interface {
|
||||
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
|
||||
}
|
||||
|
||||
func RegisterDiagnosticHandlers(mux Mux, nDB *networkdb.NetworkDB) {
|
||||
mux.HandleFunc("/watchtable", watchTable(nDB))
|
||||
mux.HandleFunc("/watchedtableentries", watchTableEntries)
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -28,23 +31,22 @@ type tableHandler struct {
|
|||
|
||||
var clientWatchTable = map[string]tableHandler{}
|
||||
|
||||
func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm() //nolint:errcheck
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 {
|
||||
rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
|
||||
diagnostic.HTTPReply(w, rsp, &diagnostic.JSONOutput{}) //nolint:errcheck
|
||||
return
|
||||
}
|
||||
func watchTable(nDB *networkdb.NetworkDB) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm() //nolint:errcheck
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 {
|
||||
rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
|
||||
diagnostic.HTTPReply(w, rsp, &diagnostic.JSONOutput{}) //nolint:errcheck
|
||||
return
|
||||
}
|
||||
|
||||
tableName := r.Form["tname"][0]
|
||||
if _, ok := clientWatchTable[tableName]; ok {
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
return
|
||||
}
|
||||
tableName := r.Form["tname"][0]
|
||||
if _, ok := clientWatchTable[tableName]; ok {
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*networkdb.NetworkDB)
|
||||
if ok {
|
||||
ch, cancel := nDB.Watch(tableName, "")
|
||||
clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)}
|
||||
go handleTableEvents(tableName, ch)
|
||||
|
@ -53,7 +55,7 @@ func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func watchTableEntries(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm() //nolint:errcheck
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 {
|
||||
|
|
|
@ -115,7 +115,6 @@ func New(cfgOptions ...config.Option) (*Controller, error) {
|
|||
networkLocker: locker.New(),
|
||||
DiagnosticServer: diagnostic.New(),
|
||||
}
|
||||
c.DiagnosticServer.Init()
|
||||
|
||||
if err := c.initStores(); err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -16,63 +16,53 @@ import (
|
|||
"github.com/docker/docker/pkg/stack"
|
||||
)
|
||||
|
||||
// HTTPHandlerFunc TODO
|
||||
type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
|
||||
|
||||
type httpHandlerCustom struct {
|
||||
ctx interface{}
|
||||
F func(interface{}, http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// ServeHTTP TODO
|
||||
func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.F(h.ctx, w, r)
|
||||
}
|
||||
|
||||
var diagPaths2Func = map[string]HTTPHandlerFunc{
|
||||
"/": notImplemented,
|
||||
"/help": help,
|
||||
"/ready": ready,
|
||||
"/stackdump": stackTrace,
|
||||
}
|
||||
|
||||
// Server when the debug is enabled exposes a
|
||||
// This data structure is protected by the Agent mutex so does not require and additional mutex here
|
||||
type Server struct {
|
||||
enable int32
|
||||
srv *http.Server
|
||||
port int
|
||||
mux *http.ServeMux
|
||||
registeredHanders map[string]bool
|
||||
sync.Mutex
|
||||
mu sync.Mutex
|
||||
enable int32
|
||||
srv *http.Server
|
||||
port int
|
||||
mux *http.ServeMux
|
||||
handlers map[string]http.Handler
|
||||
}
|
||||
|
||||
// New creates a new diagnostic server
|
||||
func New() *Server {
|
||||
return &Server{
|
||||
registeredHanders: make(map[string]bool),
|
||||
s := &Server{
|
||||
mux: http.NewServeMux(),
|
||||
handlers: make(map[string]http.Handler),
|
||||
}
|
||||
s.HandleFunc("/", notImplemented)
|
||||
s.HandleFunc("/help", s.help)
|
||||
s.HandleFunc("/ready", ready)
|
||||
s.HandleFunc("/stackdump", stackTrace)
|
||||
return s
|
||||
}
|
||||
|
||||
// Init initialize the mux for the http handling and register the base hooks
|
||||
func (s *Server) Init() {
|
||||
s.mux = http.NewServeMux()
|
||||
|
||||
// Register local handlers
|
||||
s.RegisterHandler(s, diagPaths2Func)
|
||||
// Handle registers the handler for the given pattern,
|
||||
// replacing any existing handler.
|
||||
func (s *Server) Handle(pattern string, handler http.Handler) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.handlers[pattern]; !ok {
|
||||
// Register a handler on the mux which allows the underlying handler to
|
||||
// be dynamically switched out. The http.ServeMux will panic if one
|
||||
// attempts to register a handler for the same pattern twice.
|
||||
s.mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
h := s.handlers[pattern]
|
||||
s.mu.Unlock()
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
s.handlers[pattern] = handler
|
||||
}
|
||||
|
||||
// RegisterHandler allows to register new handlers to the mux and to a specific path
|
||||
func (s *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for path, fun := range hdlrs {
|
||||
if _, ok := s.registeredHanders[path]; ok {
|
||||
continue
|
||||
}
|
||||
s.mux.Handle(path, httpHandlerCustom{ctx, fun})
|
||||
s.registeredHanders[path] = true
|
||||
}
|
||||
// Handle registers the handler function for the given pattern,
|
||||
// replacing any existing handler.
|
||||
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||
s.Handle(pattern, http.HandlerFunc(handler))
|
||||
}
|
||||
|
||||
// ServeHTTP this is the method called bu the ListenAndServe, and is needed to allow us to
|
||||
|
@ -83,8 +73,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// EnableDiagnostic opens a TCP socket to debug the passed network DB
|
||||
func (s *Server) EnableDiagnostic(ip string, port int) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.port = port
|
||||
|
||||
|
@ -112,8 +102,8 @@ func (s *Server) EnableDiagnostic(ip string, port int) {
|
|||
|
||||
// DisableDiagnostic stop the dubug and closes the tcp socket
|
||||
func (s *Server) DisableDiagnostic() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.srv.Shutdown(context.Background()) //nolint:errcheck
|
||||
s.srv = nil
|
||||
|
@ -123,12 +113,12 @@ func (s *Server) DisableDiagnostic() {
|
|||
|
||||
// IsDiagnosticEnabled returns true when the debug is enabled
|
||||
func (s *Server) IsDiagnosticEnabled() bool {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.enable == 1
|
||||
}
|
||||
|
||||
func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func notImplemented(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
_, jsonOutput := ParseHTTPFormOptions(r)
|
||||
rsp := WrongCommand("not implemented", fmt.Sprintf("URL path: %s no method implemented check /help\n", r.URL.Path))
|
||||
|
@ -144,7 +134,7 @@ func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
_, _ = HTTPReply(w, rsp, jsonOutput)
|
||||
}
|
||||
|
||||
func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (s *Server) help(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
_, jsonOutput := ParseHTTPFormOptions(r)
|
||||
|
||||
|
@ -156,17 +146,16 @@ func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
"url": r.URL.String(),
|
||||
}).Info("help done")
|
||||
|
||||
n, ok := ctx.(*Server)
|
||||
var result string
|
||||
if ok {
|
||||
for path := range n.registeredHanders {
|
||||
result += fmt.Sprintf("%s\n", path)
|
||||
}
|
||||
_, _ = HTTPReply(w, CommandSucceed(&StringCmd{Info: result}), jsonOutput)
|
||||
s.mu.Lock()
|
||||
for path := range s.handlers {
|
||||
result += fmt.Sprintf("%s\n", path)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
_, _ = HTTPReply(w, CommandSucceed(&StringCmd{Info: result}), jsonOutput)
|
||||
}
|
||||
|
||||
func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func ready(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
_, jsonOutput := ParseHTTPFormOptions(r)
|
||||
|
||||
|
@ -180,7 +169,7 @@ func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
_, _ = HTTPReply(w, CommandSucceed(&StringCmd{Info: "OK"}), jsonOutput)
|
||||
}
|
||||
|
||||
func stackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func stackTrace(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
_, jsonOutput := ParseHTTPFormOptions(r)
|
||||
|
||||
|
|
|
@ -17,22 +17,25 @@ const (
|
|||
dbNotAvailable = "database not available"
|
||||
)
|
||||
|
||||
// NetDbPaths2Func TODO
|
||||
var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
|
||||
"/join": dbJoin,
|
||||
"/networkpeers": dbPeers,
|
||||
"/clusterpeers": dbClusterPeers,
|
||||
"/joinnetwork": dbJoinNetwork,
|
||||
"/leavenetwork": dbLeaveNetwork,
|
||||
"/createentry": dbCreateEntry,
|
||||
"/updateentry": dbUpdateEntry,
|
||||
"/deleteentry": dbDeleteEntry,
|
||||
"/getentry": dbGetEntry,
|
||||
"/gettable": dbGetTable,
|
||||
"/networkstats": dbNetworkStats,
|
||||
type Mux interface {
|
||||
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
|
||||
}
|
||||
|
||||
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) RegisterDiagnosticHandlers(m Mux) {
|
||||
m.HandleFunc("/join", nDB.dbJoin)
|
||||
m.HandleFunc("/networkpeers", nDB.dbPeers)
|
||||
m.HandleFunc("/clusterpeers", nDB.dbClusterPeers)
|
||||
m.HandleFunc("/joinnetwork", nDB.dbJoinNetwork)
|
||||
m.HandleFunc("/leavenetwork", nDB.dbLeaveNetwork)
|
||||
m.HandleFunc("/createentry", nDB.dbCreateEntry)
|
||||
m.HandleFunc("/updateentry", nDB.dbUpdateEntry)
|
||||
m.HandleFunc("/deleteentry", nDB.dbDeleteEntry)
|
||||
m.HandleFunc("/getentry", nDB.dbGetEntry)
|
||||
m.HandleFunc("/gettable", nDB.dbGetTable)
|
||||
m.HandleFunc("/networkstats", nDB.dbNetworkStats)
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) dbJoin(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -53,24 +56,19 @@ func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
err := nDB.Join(strings.Split(r.Form["members"][0], ","))
|
||||
if err != nil {
|
||||
rsp := diagnostic.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
|
||||
logger.WithError(err).Error("join cluster failed")
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("join cluster done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
err := nDB.Join(strings.Split(r.Form["members"][0], ","))
|
||||
if err != nil {
|
||||
rsp := diagnostic.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
|
||||
logger.WithError(err).Error("join cluster failed")
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
|
||||
logger.Info("join cluster done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbPeers(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -91,25 +89,20 @@ func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
peers := nDB.Peers(r.Form["nid"][0])
|
||||
rsp := &diagnostic.TableObj{Length: len(peers)}
|
||||
for i, peerInfo := range peers {
|
||||
if peerInfo.IP == "unknown" {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: "orphan-" + peerInfo.Name, IP: peerInfo.IP})
|
||||
} else {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
|
||||
}
|
||||
peers := nDB.Peers(r.Form["nid"][0])
|
||||
rsp := &diagnostic.TableObj{Length: len(peers)}
|
||||
for i, peerInfo := range peers {
|
||||
if peerInfo.IP == "unknown" {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: "orphan-" + peerInfo.Name, IP: peerInfo.IP})
|
||||
} else {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
|
||||
}
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
}
|
||||
|
||||
func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbClusterPeers(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -123,21 +116,16 @@ func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
logger.Info("cluster peers")
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
peers := nDB.ClusterPeers()
|
||||
rsp := &diagnostic.TableObj{Length: len(peers)}
|
||||
for i, peerInfo := range peers {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
|
||||
}
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
return
|
||||
peers := nDB.ClusterPeers()
|
||||
rsp := &diagnostic.TableObj{Length: len(peers)}
|
||||
for i, peerInfo := range peers {
|
||||
rsp.Elements = append(rsp.Elements, &diagnostic.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
}
|
||||
|
||||
func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbCreateEntry(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
unsafe, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -176,22 +164,17 @@ func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
|
||||
rsp := diagnostic.FailCommand(err)
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
logger.WithError(err).Error("create entry failed")
|
||||
return
|
||||
}
|
||||
logger.Info("create entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
|
||||
rsp := diagnostic.FailCommand(err)
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
logger.WithError(err).Error("create entry failed")
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.Info("create entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbUpdateEntry(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
unsafe, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -230,21 +213,16 @@ func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
|
||||
logger.WithError(err).Error("update entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
logger.Info("update entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
|
||||
logger.WithError(err).Error("update entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.Info("update entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbDeleteEntry(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -271,22 +249,17 @@ func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
err := nDB.DeleteEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("delete entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
logger.Info("delete entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
err := nDB.DeleteEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("delete entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.Info("delete entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbGetEntry(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
unsafe, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -313,31 +286,26 @@ func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
value, err := nDB.GetEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("get entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
|
||||
var encodedValue string
|
||||
if unsafe {
|
||||
encodedValue = string(value)
|
||||
} else {
|
||||
encodedValue = base64.StdEncoding.EncodeToString(value)
|
||||
}
|
||||
|
||||
rsp := &diagnostic.TableEntryObj{Key: key, Value: encodedValue}
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
value, err := nDB.GetEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("get entry failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
|
||||
var encodedValue string
|
||||
if unsafe {
|
||||
encodedValue = string(value)
|
||||
} else {
|
||||
encodedValue = base64.StdEncoding.EncodeToString(value)
|
||||
}
|
||||
|
||||
rsp := &diagnostic.TableEntryObj{Key: key, Value: encodedValue}
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get entry done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
}
|
||||
|
||||
func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbJoinNetwork(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -360,21 +328,16 @@ func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.JoinNetwork(nid); err != nil {
|
||||
logger.WithError(err).Error("join network failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
logger.Info("join network done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
if err := nDB.JoinNetwork(nid); err != nil {
|
||||
logger.WithError(err).Error("join network failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.Info("join network done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbLeaveNetwork(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -397,21 +360,16 @@ func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.LeaveNetwork(nid); err != nil {
|
||||
logger.WithError(err).Error("leave network failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
logger.Info("leave network done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
if err := nDB.LeaveNetwork(nid); err != nil {
|
||||
logger.WithError(err).Error("leave network failed")
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(err), json)
|
||||
return
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.Info("leave network done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(nil), json)
|
||||
}
|
||||
|
||||
func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbGetTable(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
unsafe, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -436,35 +394,30 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
table := nDB.GetTableByNetwork(tname, nid)
|
||||
rsp := &diagnostic.TableObj{Length: len(table)}
|
||||
i := 0
|
||||
for k, v := range table {
|
||||
var encodedValue string
|
||||
if unsafe {
|
||||
encodedValue = string(v.Value)
|
||||
} else {
|
||||
encodedValue = base64.StdEncoding.EncodeToString(v.Value)
|
||||
}
|
||||
rsp.Elements = append(rsp.Elements,
|
||||
&diagnostic.TableEntryObj{
|
||||
Index: i,
|
||||
Key: k,
|
||||
Value: encodedValue,
|
||||
Owner: v.owner,
|
||||
})
|
||||
i++
|
||||
table := nDB.GetTableByNetwork(tname, nid)
|
||||
rsp := &diagnostic.TableObj{Length: len(table)}
|
||||
i := 0
|
||||
for k, v := range table {
|
||||
var encodedValue string
|
||||
if unsafe {
|
||||
encodedValue = string(v.Value)
|
||||
} else {
|
||||
encodedValue = base64.StdEncoding.EncodeToString(v.Value)
|
||||
}
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
return
|
||||
rsp.Elements = append(rsp.Elements,
|
||||
&diagnostic.TableEntryObj{
|
||||
Index: i,
|
||||
Key: k,
|
||||
Value: encodedValue,
|
||||
Owner: v.owner,
|
||||
})
|
||||
i++
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
|
||||
diagnostic.HTTPReply(w, diagnostic.CommandSucceed(rsp), json)
|
||||
}
|
||||
|
||||
func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
func (nDB *NetworkDB) dbNetworkStats(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
diagnostic.DebugHTTPForm(r)
|
||||
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||
|
@ -485,24 +438,19 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
nDB.RLock()
|
||||
networks := nDB.networks[nDB.config.NodeID]
|
||||
network, ok := networks[r.Form["nid"][0]]
|
||||
|
||||
entries := -1
|
||||
qLen := -1
|
||||
if ok {
|
||||
nDB.RLock()
|
||||
networks := nDB.networks[nDB.config.NodeID]
|
||||
network, ok := networks[r.Form["nid"][0]]
|
||||
|
||||
entries := -1
|
||||
qLen := -1
|
||||
if ok {
|
||||
entries = int(network.entriesNumber.Load())
|
||||
qLen = network.tableBroadcasts.NumQueued()
|
||||
}
|
||||
nDB.RUnlock()
|
||||
|
||||
rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen})
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done")
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
return
|
||||
entries = int(network.entriesNumber.Load())
|
||||
qLen = network.tableBroadcasts.NumQueued()
|
||||
}
|
||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf(dbNotAvailable)), json)
|
||||
nDB.RUnlock()
|
||||
|
||||
rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen})
|
||||
logger.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done")
|
||||
diagnostic.HTTPReply(w, rsp, json)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue