|
@@ -16,6 +16,7 @@ package dbus
|
|
|
|
|
|
import (
|
|
|
"errors"
|
|
|
+ "log"
|
|
|
"time"
|
|
|
|
|
|
"github.com/godbus/dbus"
|
|
@@ -36,22 +37,12 @@ func (c *Conn) Subscribe() error {
|
|
|
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
|
|
|
"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
|
|
|
|
|
|
- err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- return nil
|
|
|
+ return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
|
|
|
}
|
|
|
|
|
|
// Unsubscribe this connection from systemd dbus events.
|
|
|
func (c *Conn) Unsubscribe() error {
|
|
|
- err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- return nil
|
|
|
+ return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
|
|
|
}
|
|
|
|
|
|
func (c *Conn) dispatch() {
|
|
@@ -70,7 +61,8 @@ func (c *Conn) dispatch() {
|
|
|
c.jobComplete(signal)
|
|
|
}
|
|
|
|
|
|
- if c.subscriber.updateCh == nil {
|
|
|
+ if c.subStateSubscriber.updateCh == nil &&
|
|
|
+ c.propertiesSubscriber.updateCh == nil {
|
|
|
continue
|
|
|
}
|
|
|
|
|
@@ -84,6 +76,12 @@ func (c *Conn) dispatch() {
|
|
|
case "org.freedesktop.DBus.Properties.PropertiesChanged":
|
|
|
if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
|
|
|
unitPath = signal.Path
|
|
|
+
|
|
|
+ if len(signal.Body) >= 2 {
|
|
|
+ if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
|
|
|
+ c.sendPropertiesUpdate(unitPath, changed)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -169,42 +167,80 @@ type SubStateUpdate struct {
|
|
|
// is full, it attempts to write an error to errCh; if errCh is full, the error
|
|
|
// passes silently.
|
|
|
func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
|
|
|
- c.subscriber.Lock()
|
|
|
- defer c.subscriber.Unlock()
|
|
|
- c.subscriber.updateCh = updateCh
|
|
|
- c.subscriber.errCh = errCh
|
|
|
+ if c == nil {
|
|
|
+ msg := "nil receiver"
|
|
|
+ select {
|
|
|
+ case errCh <- errors.New(msg):
|
|
|
+ default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", msg)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ c.subStateSubscriber.Lock()
|
|
|
+ defer c.subStateSubscriber.Unlock()
|
|
|
+ c.subStateSubscriber.updateCh = updateCh
|
|
|
+ c.subStateSubscriber.errCh = errCh
|
|
|
}
|
|
|
|
|
|
-func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
|
|
|
- c.subscriber.Lock()
|
|
|
- defer c.subscriber.Unlock()
|
|
|
+func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
|
|
|
+ c.subStateSubscriber.Lock()
|
|
|
+ defer c.subStateSubscriber.Unlock()
|
|
|
+
|
|
|
+ if c.subStateSubscriber.updateCh == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if c.shouldIgnore(path) {
|
|
|
+ isIgnored := c.shouldIgnore(unitPath)
|
|
|
+ defer c.cleanIgnore()
|
|
|
+ if isIgnored {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- info, err := c.GetUnitProperties(string(path))
|
|
|
+ info, err := c.GetUnitPathProperties(unitPath)
|
|
|
if err != nil {
|
|
|
select {
|
|
|
- case c.subscriber.errCh <- err:
|
|
|
+ case c.subStateSubscriber.errCh <- err:
|
|
|
default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", err)
|
|
|
}
|
|
|
+ return
|
|
|
}
|
|
|
+ defer c.updateIgnore(unitPath, info)
|
|
|
|
|
|
- name := info["Id"].(string)
|
|
|
- substate := info["SubState"].(string)
|
|
|
+ name, ok := info["Id"].(string)
|
|
|
+ if !ok {
|
|
|
+ msg := "failed to cast info.Id"
|
|
|
+ select {
|
|
|
+ case c.subStateSubscriber.errCh <- errors.New(msg):
|
|
|
+ default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", err)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ substate, ok := info["SubState"].(string)
|
|
|
+ if !ok {
|
|
|
+ msg := "failed to cast info.SubState"
|
|
|
+ select {
|
|
|
+ case c.subStateSubscriber.errCh <- errors.New(msg):
|
|
|
+ default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", msg)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
update := &SubStateUpdate{name, substate}
|
|
|
select {
|
|
|
- case c.subscriber.updateCh <- update:
|
|
|
+ case c.subStateSubscriber.updateCh <- update:
|
|
|
default:
|
|
|
+ msg := "update channel is full"
|
|
|
select {
|
|
|
- case c.subscriber.errCh <- errors.New("update channel full!"):
|
|
|
+ case c.subStateSubscriber.errCh <- errors.New(msg):
|
|
|
default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", msg)
|
|
|
}
|
|
|
+ return
|
|
|
}
|
|
|
-
|
|
|
- c.updateIgnore(path, info)
|
|
|
}
|
|
|
|
|
|
// The ignore functions work around a wart in the systemd dbus interface.
|
|
@@ -222,29 +258,76 @@ func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
|
|
|
// the properties).
|
|
|
|
|
|
func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
|
|
|
- t, ok := c.subscriber.ignore[path]
|
|
|
+ t, ok := c.subStateSubscriber.ignore[path]
|
|
|
return ok && t >= time.Now().UnixNano()
|
|
|
}
|
|
|
|
|
|
func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
|
|
|
- c.cleanIgnore()
|
|
|
+ loadState, ok := info["LoadState"].(string)
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
// unit is unloaded - it will trigger bad systemd dbus behavior
|
|
|
- if info["LoadState"].(string) == "not-found" {
|
|
|
- c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
|
|
|
+ if loadState == "not-found" {
|
|
|
+ c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// without this, ignore would grow unboundedly over time
|
|
|
func (c *Conn) cleanIgnore() {
|
|
|
now := time.Now().UnixNano()
|
|
|
- if c.subscriber.cleanIgnore < now {
|
|
|
- c.subscriber.cleanIgnore = now + cleanIgnoreInterval
|
|
|
+ if c.subStateSubscriber.cleanIgnore < now {
|
|
|
+ c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval
|
|
|
|
|
|
- for p, t := range c.subscriber.ignore {
|
|
|
+ for p, t := range c.subStateSubscriber.ignore {
|
|
|
if t < now {
|
|
|
- delete(c.subscriber.ignore, p)
|
|
|
+ delete(c.subStateSubscriber.ignore, p)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// PropertiesUpdate holds a map of a unit's changed properties
|
|
|
+type PropertiesUpdate struct {
|
|
|
+ UnitName string
|
|
|
+ Changed map[string]dbus.Variant
|
|
|
+}
|
|
|
+
|
|
|
+// SetPropertiesSubscriber writes to updateCh when any unit's properties
|
|
|
+// change. Every property change reported by systemd will be sent; that is, no
|
|
|
+// transitions will be "missed" (as they might be with SetSubStateSubscriber).
|
|
|
+// However, state changes will only be written to the channel with non-blocking
|
|
|
+// writes. If updateCh is full, it attempts to write an error to errCh; if
|
|
|
+// errCh is full, the error passes silently.
|
|
|
+func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
|
|
|
+ c.propertiesSubscriber.Lock()
|
|
|
+ defer c.propertiesSubscriber.Unlock()
|
|
|
+ c.propertiesSubscriber.updateCh = updateCh
|
|
|
+ c.propertiesSubscriber.errCh = errCh
|
|
|
+}
|
|
|
+
|
|
|
+// we don't need to worry about shouldIgnore() here because
|
|
|
+// sendPropertiesUpdate doesn't call GetProperties()
|
|
|
+func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
|
|
|
+ c.propertiesSubscriber.Lock()
|
|
|
+ defer c.propertiesSubscriber.Unlock()
|
|
|
+
|
|
|
+ if c.propertiesSubscriber.updateCh == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ update := &PropertiesUpdate{unitName(unitPath), changedProps}
|
|
|
+
|
|
|
+ select {
|
|
|
+ case c.propertiesSubscriber.updateCh <- update:
|
|
|
+ default:
|
|
|
+ msg := "update channel is full"
|
|
|
+ select {
|
|
|
+ case c.propertiesSubscriber.errCh <- errors.New(msg):
|
|
|
+ default:
|
|
|
+ log.Printf("full error channel while reporting: %s\n", msg)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+}
|