|
@@ -36,6 +36,8 @@ import (
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
+ "cloud.google.com/go/compute/metadata"
|
|
|
|
+ "cloud.google.com/go/internal/version"
|
|
vkit "cloud.google.com/go/logging/apiv2"
|
|
vkit "cloud.google.com/go/logging/apiv2"
|
|
"cloud.google.com/go/logging/internal"
|
|
"cloud.google.com/go/logging/internal"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/golang/protobuf/proto"
|
|
@@ -77,6 +79,12 @@ const (
|
|
|
|
|
|
// DefaultBufferedByteLimit is the default value for the BufferedByteLimit LoggerOption.
|
|
// DefaultBufferedByteLimit is the default value for the BufferedByteLimit LoggerOption.
|
|
DefaultBufferedByteLimit = 1 << 30 // 1GiB
|
|
DefaultBufferedByteLimit = 1 << 30 // 1GiB
|
|
|
|
+
|
|
|
|
+ // defaultWriteTimeout is the timeout for the underlying write API calls. As
|
|
|
|
+ // write API calls are not idempotent, they are not retried on timeout. This
|
|
|
|
+ // timeout is to allow clients to degrade gracefully if underlying logging
|
|
|
|
+ // service is temporarily impaired for some reason.
|
|
|
|
+ defaultWriteTimeout = 10 * time.Minute
|
|
)
|
|
)
|
|
|
|
|
|
// For testing:
|
|
// For testing:
|
|
@@ -84,16 +92,24 @@ var now = time.Now
|
|
|
|
|
|
// ErrOverflow signals that the number of buffered entries for a Logger
|
|
// ErrOverflow signals that the number of buffered entries for a Logger
|
|
// exceeds its BufferLimit.
|
|
// exceeds its BufferLimit.
|
|
-var ErrOverflow = errors.New("logging: log entry overflowed buffer limits")
|
|
|
|
|
|
+var ErrOverflow = bundler.ErrOverflow
|
|
|
|
+
|
|
|
|
+// ErrOversizedEntry signals that an entry's size exceeds the maximum number of
|
|
|
|
+// bytes that will be sent in a single call to the logging service.
|
|
|
|
+var ErrOversizedEntry = bundler.ErrOversizedItem
|
|
|
|
|
|
// Client is a Logging client. A Client is associated with a single Cloud project.
|
|
// Client is a Logging client. A Client is associated with a single Cloud project.
|
|
type Client struct {
|
|
type Client struct {
|
|
- client *vkit.Client // client for the logging service
|
|
|
|
- projectID string
|
|
|
|
- errc chan error // should be buffered to minimize dropped errors
|
|
|
|
- donec chan struct{} // closed on Client.Close to close Logger bundlers
|
|
|
|
- loggers sync.WaitGroup // so we can wait for loggers to close
|
|
|
|
- closed bool
|
|
|
|
|
|
+ client *vkit.Client // client for the logging service
|
|
|
|
+ parent string // e.g. "projects/proj-id"
|
|
|
|
+ errc chan error // should be buffered to minimize dropped errors
|
|
|
|
+ donec chan struct{} // closed on Client.Close to close Logger bundlers
|
|
|
|
+ loggers sync.WaitGroup // so we can wait for loggers to close
|
|
|
|
+ closed bool
|
|
|
|
+
|
|
|
|
+ mu sync.Mutex
|
|
|
|
+ nErrs int // number of errors we saw
|
|
|
|
+ lastErr error // last error we saw
|
|
|
|
|
|
// OnError is called when an error occurs in a call to Log or Flush. The
|
|
// OnError is called when an error occurs in a call to Log or Flush. The
|
|
// error may be due to an invalid Entry, an overflow because BufferLimit
|
|
// error may be due to an invalid Entry, an overflow because BufferLimit
|
|
@@ -107,15 +123,20 @@ type Client struct {
|
|
OnError func(err error)
|
|
OnError func(err error)
|
|
}
|
|
}
|
|
|
|
|
|
-// NewClient returns a new logging client associated with the provided project ID.
|
|
|
|
|
|
+// NewClient returns a new logging client associated with the provided parent.
|
|
|
|
+// A parent can take any of the following forms:
|
|
|
|
+// projects/PROJECT_ID
|
|
|
|
+// folders/FOLDER_ID
|
|
|
|
+// billingAccounts/ACCOUNT_ID
|
|
|
|
+// organizations/ORG_ID
|
|
|
|
+// for backwards compatibility, a string with no '/' is also allowed and is interpreted
|
|
|
|
+// as a project ID.
|
|
//
|
|
//
|
|
// By default NewClient uses WriteScope. To use a different scope, call
|
|
// By default NewClient uses WriteScope. To use a different scope, call
|
|
// NewClient using a WithScopes option (see https://godoc.org/google.golang.org/api/option#WithScopes).
|
|
// NewClient using a WithScopes option (see https://godoc.org/google.golang.org/api/option#WithScopes).
|
|
-func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
|
|
|
|
- // Check for '/' in project ID to reserve the ability to support various owning resources,
|
|
|
|
- // in the form "{Collection}/{Name}", for instance "organizations/my-org".
|
|
|
|
- if strings.ContainsRune(projectID, '/') {
|
|
|
|
- return nil, errors.New("logging: project ID contains '/'")
|
|
|
|
|
|
+func NewClient(ctx context.Context, parent string, opts ...option.ClientOption) (*Client, error) {
|
|
|
|
+ if !strings.ContainsRune(parent, '/') {
|
|
|
|
+ parent = "projects/" + parent
|
|
}
|
|
}
|
|
opts = append([]option.ClientOption{
|
|
opts = append([]option.ClientOption{
|
|
option.WithEndpoint(internal.ProdAddr),
|
|
option.WithEndpoint(internal.ProdAddr),
|
|
@@ -125,13 +146,13 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
- c.SetGoogleClientInfo("logging", internal.Version)
|
|
|
|
|
|
+ c.SetGoogleClientInfo("gccl", version.Repo)
|
|
client := &Client{
|
|
client := &Client{
|
|
- client: c,
|
|
|
|
- projectID: projectID,
|
|
|
|
- errc: make(chan error, defaultErrorCapacity), // create a small buffer for errors
|
|
|
|
- donec: make(chan struct{}),
|
|
|
|
- OnError: func(e error) { log.Printf("logging client: %v", e) },
|
|
|
|
|
|
+ client: c,
|
|
|
|
+ parent: parent,
|
|
|
|
+ errc: make(chan error, defaultErrorCapacity), // create a small buffer for errors
|
|
|
|
+ donec: make(chan struct{}),
|
|
|
|
+ OnError: func(e error) { log.Printf("logging client: %v", e) },
|
|
}
|
|
}
|
|
// Call the user's function synchronously, to make life easier for them.
|
|
// Call the user's function synchronously, to make life easier for them.
|
|
go func() {
|
|
go func() {
|
|
@@ -143,18 +164,13 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
|
|
if fn := client.OnError; fn != nil {
|
|
if fn := client.OnError; fn != nil {
|
|
fn(err)
|
|
fn(err)
|
|
} else {
|
|
} else {
|
|
- log.Printf("logging (project ID %q): %v", projectID, err)
|
|
|
|
|
|
+ log.Printf("logging (parent %q): %v", parent, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
return client, nil
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
|
|
-// parent returns the string used in many RPCs to denote the parent resource of the log.
|
|
|
|
-func (c *Client) parent() string {
|
|
|
|
- return "projects/" + c.projectID
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
var unixZeroTimestamp *tspb.Timestamp
|
|
var unixZeroTimestamp *tspb.Timestamp
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
@@ -170,18 +186,43 @@ func init() {
|
|
// log entry "ping" to a log named "ping".
|
|
// log entry "ping" to a log named "ping".
|
|
func (c *Client) Ping(ctx context.Context) error {
|
|
func (c *Client) Ping(ctx context.Context) error {
|
|
ent := &logpb.LogEntry{
|
|
ent := &logpb.LogEntry{
|
|
- Payload: &logpb.LogEntry_TextPayload{"ping"},
|
|
|
|
|
|
+ Payload: &logpb.LogEntry_TextPayload{TextPayload: "ping"},
|
|
Timestamp: unixZeroTimestamp, // Identical timestamps and insert IDs are both
|
|
Timestamp: unixZeroTimestamp, // Identical timestamps and insert IDs are both
|
|
InsertId: "ping", // necessary for the service to dedup these entries.
|
|
InsertId: "ping", // necessary for the service to dedup these entries.
|
|
}
|
|
}
|
|
_, err := c.client.WriteLogEntries(ctx, &logpb.WriteLogEntriesRequest{
|
|
_, err := c.client.WriteLogEntries(ctx, &logpb.WriteLogEntriesRequest{
|
|
- LogName: internal.LogPath(c.parent(), "ping"),
|
|
|
|
- Resource: &mrpb.MonitoredResource{Type: "global"},
|
|
|
|
|
|
+ LogName: internal.LogPath(c.parent, "ping"),
|
|
|
|
+ Resource: monitoredResource(c.parent),
|
|
Entries: []*logpb.LogEntry{ent},
|
|
Entries: []*logpb.LogEntry{ent},
|
|
})
|
|
})
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// error puts the error on the client's error channel
|
|
|
|
+// without blocking, and records summary error info.
|
|
|
|
+func (c *Client) error(err error) {
|
|
|
|
+ select {
|
|
|
|
+ case c.errc <- err:
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+ c.mu.Lock()
|
|
|
|
+ c.lastErr = err
|
|
|
|
+ c.nErrs++
|
|
|
|
+ c.mu.Unlock()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *Client) extractErrorInfo() error {
|
|
|
|
+ var err error
|
|
|
|
+ c.mu.Lock()
|
|
|
|
+ if c.lastErr != nil {
|
|
|
|
+ err = fmt.Errorf("saw %d errors; last: %v", c.nErrs, c.lastErr)
|
|
|
|
+ c.nErrs = 0
|
|
|
|
+ c.lastErr = nil
|
|
|
|
+ }
|
|
|
|
+ c.mu.Unlock()
|
|
|
|
+ return err
|
|
|
|
+}
|
|
|
|
+
|
|
// A Logger is used to write log messages to a single log. It can be configured
|
|
// A Logger is used to write log messages to a single log. It can be configured
|
|
// with a log ID, common monitored resource, and a set of common labels.
|
|
// with a log ID, common monitored resource, and a set of common labels.
|
|
type Logger struct {
|
|
type Logger struct {
|
|
@@ -193,6 +234,7 @@ type Logger struct {
|
|
// Options
|
|
// Options
|
|
commonResource *mrpb.MonitoredResource
|
|
commonResource *mrpb.MonitoredResource
|
|
commonLabels map[string]string
|
|
commonLabels map[string]string
|
|
|
|
+ writeTimeout time.Duration
|
|
}
|
|
}
|
|
|
|
|
|
// A LoggerOption is a configuration option for a Logger.
|
|
// A LoggerOption is a configuration option for a Logger.
|
|
@@ -201,14 +243,80 @@ type LoggerOption interface {
|
|
}
|
|
}
|
|
|
|
|
|
// CommonResource sets the monitored resource associated with all log entries
|
|
// CommonResource sets the monitored resource associated with all log entries
|
|
-// written from a Logger. If not provided, a resource of type "global" is used.
|
|
|
|
-// This value can be overridden by setting an Entry's Resource field.
|
|
|
|
|
|
+// written from a Logger. If not provided, the resource is automatically
|
|
|
|
+// detected based on the running environment. This value can be overridden
|
|
|
|
+// per-entry by setting an Entry's Resource field.
|
|
func CommonResource(r *mrpb.MonitoredResource) LoggerOption { return commonResource{r} }
|
|
func CommonResource(r *mrpb.MonitoredResource) LoggerOption { return commonResource{r} }
|
|
|
|
|
|
type commonResource struct{ *mrpb.MonitoredResource }
|
|
type commonResource struct{ *mrpb.MonitoredResource }
|
|
|
|
|
|
func (r commonResource) set(l *Logger) { l.commonResource = r.MonitoredResource }
|
|
func (r commonResource) set(l *Logger) { l.commonResource = r.MonitoredResource }
|
|
|
|
|
|
|
|
+var detectedResource struct {
|
|
|
|
+ pb *mrpb.MonitoredResource
|
|
|
|
+ once sync.Once
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func detectResource() *mrpb.MonitoredResource {
|
|
|
|
+ detectedResource.once.Do(func() {
|
|
|
|
+ if !metadata.OnGCE() {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ projectID, err := metadata.ProjectID()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ id, err := metadata.InstanceID()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ zone, err := metadata.Zone()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ detectedResource.pb = &mrpb.MonitoredResource{
|
|
|
|
+ Type: "gce_instance",
|
|
|
|
+ Labels: map[string]string{
|
|
|
|
+ "project_id": projectID,
|
|
|
|
+ "instance_id": id,
|
|
|
|
+ "zone": zone,
|
|
|
|
+ },
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ return detectedResource.pb
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+var resourceInfo = map[string]struct{ rtype, label string }{
|
|
|
|
+ "organizations": {"organization", "organization_id"},
|
|
|
|
+ "folders": {"folder", "folder_id"},
|
|
|
|
+ "projects": {"project", "project_id"},
|
|
|
|
+ "billingAccounts": {"billing_account", "account_id"},
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func monitoredResource(parent string) *mrpb.MonitoredResource {
|
|
|
|
+ parts := strings.SplitN(parent, "/", 2)
|
|
|
|
+ if len(parts) != 2 {
|
|
|
|
+ return globalResource(parent)
|
|
|
|
+ }
|
|
|
|
+ info, ok := resourceInfo[parts[0]]
|
|
|
|
+ if !ok {
|
|
|
|
+ return globalResource(parts[1])
|
|
|
|
+ }
|
|
|
|
+ return &mrpb.MonitoredResource{
|
|
|
|
+ Type: info.rtype,
|
|
|
|
+ Labels: map[string]string{info.label: parts[1]},
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func globalResource(projectID string) *mrpb.MonitoredResource {
|
|
|
|
+ return &mrpb.MonitoredResource{
|
|
|
|
+ Type: "global",
|
|
|
|
+ Labels: map[string]string{
|
|
|
|
+ "project_id": projectID,
|
|
|
|
+ },
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
// CommonLabels are labels that apply to all log entries written from a Logger,
|
|
// CommonLabels are labels that apply to all log entries written from a Logger,
|
|
// so that you don't have to repeat them in each log entry's Labels field. If
|
|
// so that you don't have to repeat them in each log entry's Labels field. If
|
|
// any of the log entries contains a (key, value) with the same key that is in
|
|
// any of the log entries contains a (key, value) with the same key that is in
|
|
@@ -220,6 +328,15 @@ type commonLabels map[string]string
|
|
|
|
|
|
func (c commonLabels) set(l *Logger) { l.commonLabels = c }
|
|
func (c commonLabels) set(l *Logger) { l.commonLabels = c }
|
|
|
|
|
|
|
|
+// ConcurrentWriteLimit determines how many goroutines will send log entries to the
|
|
|
|
+// underlying service. The default is 1. Set ConcurrentWriteLimit to a higher value to
|
|
|
|
+// increase throughput.
|
|
|
|
+func ConcurrentWriteLimit(n int) LoggerOption { return concurrentWriteLimit(n) }
|
|
|
|
+
|
|
|
|
+type concurrentWriteLimit int
|
|
|
|
+
|
|
|
|
+func (c concurrentWriteLimit) set(l *Logger) { l.bundler.HandlerLimit = int(c) }
|
|
|
|
+
|
|
// DelayThreshold is the maximum amount of time that an entry should remain
|
|
// DelayThreshold is the maximum amount of time that an entry should remain
|
|
// buffered in memory before a call to the logging service is triggered. Larger
|
|
// buffered in memory before a call to the logging service is triggered. Larger
|
|
// values of DelayThreshold will generally result in fewer calls to the logging
|
|
// values of DelayThreshold will generally result in fewer calls to the logging
|
|
@@ -256,10 +373,10 @@ type entryByteThreshold int
|
|
func (e entryByteThreshold) set(l *Logger) { l.bundler.BundleByteThreshold = int(e) }
|
|
func (e entryByteThreshold) set(l *Logger) { l.bundler.BundleByteThreshold = int(e) }
|
|
|
|
|
|
// EntryByteLimit is the maximum number of bytes of entries that will be sent
|
|
// EntryByteLimit is the maximum number of bytes of entries that will be sent
|
|
-// in a single call to the logging service. This option limits the size of a
|
|
|
|
-// single RPC payload, to account for network or service issues with large
|
|
|
|
-// RPCs. If EntryByteLimit is smaller than EntryByteThreshold, the latter has
|
|
|
|
-// no effect.
|
|
|
|
|
|
+// in a single call to the logging service. ErrOversizedEntry is returned if an
|
|
|
|
+// entry exceeds EntryByteLimit. This option limits the size of a single RPC
|
|
|
|
+// payload, to account for network or service issues with large RPCs. If
|
|
|
|
+// EntryByteLimit is smaller than EntryByteThreshold, the latter has no effect.
|
|
// The default is zero, meaning there is no limit.
|
|
// The default is zero, meaning there is no limit.
|
|
func EntryByteLimit(n int) LoggerOption { return entryByteLimit(n) }
|
|
func EntryByteLimit(n int) LoggerOption { return entryByteLimit(n) }
|
|
|
|
|
|
@@ -287,15 +404,17 @@ func (b bufferedByteLimit) set(l *Logger) { l.bundler.BufferedByteLimit = int(b)
|
|
// characters: [A-Za-z0-9]; and punctuation characters: forward-slash,
|
|
// characters: [A-Za-z0-9]; and punctuation characters: forward-slash,
|
|
// underscore, hyphen, and period.
|
|
// underscore, hyphen, and period.
|
|
func (c *Client) Logger(logID string, opts ...LoggerOption) *Logger {
|
|
func (c *Client) Logger(logID string, opts ...LoggerOption) *Logger {
|
|
|
|
+ r := detectResource()
|
|
|
|
+ if r == nil {
|
|
|
|
+ r = monitoredResource(c.parent)
|
|
|
|
+ }
|
|
l := &Logger{
|
|
l := &Logger{
|
|
client: c,
|
|
client: c,
|
|
- logName: internal.LogPath(c.parent(), logID),
|
|
|
|
- commonResource: &mrpb.MonitoredResource{Type: "global"},
|
|
|
|
|
|
+ logName: internal.LogPath(c.parent, logID),
|
|
|
|
+ commonResource: r,
|
|
}
|
|
}
|
|
- // TODO(jba): determine the right context for the bundle handler.
|
|
|
|
- ctx := context.TODO()
|
|
|
|
l.bundler = bundler.NewBundler(&logpb.LogEntry{}, func(entries interface{}) {
|
|
l.bundler = bundler.NewBundler(&logpb.LogEntry{}, func(entries interface{}) {
|
|
- l.writeLogEntries(ctx, entries.([]*logpb.LogEntry))
|
|
|
|
|
|
+ l.writeLogEntries(entries.([]*logpb.LogEntry))
|
|
})
|
|
})
|
|
l.bundler.DelayThreshold = DefaultDelayThreshold
|
|
l.bundler.DelayThreshold = DefaultDelayThreshold
|
|
l.bundler.BundleCountThreshold = DefaultEntryCountThreshold
|
|
l.bundler.BundleCountThreshold = DefaultEntryCountThreshold
|
|
@@ -304,16 +423,18 @@ func (c *Client) Logger(logID string, opts ...LoggerOption) *Logger {
|
|
for _, opt := range opts {
|
|
for _, opt := range opts {
|
|
opt.set(l)
|
|
opt.set(l)
|
|
}
|
|
}
|
|
-
|
|
|
|
l.stdLoggers = map[Severity]*log.Logger{}
|
|
l.stdLoggers = map[Severity]*log.Logger{}
|
|
for s := range severityName {
|
|
for s := range severityName {
|
|
l.stdLoggers[s] = log.New(severityWriter{l, s}, "", 0)
|
|
l.stdLoggers[s] = log.New(severityWriter{l, s}, "", 0)
|
|
}
|
|
}
|
|
|
|
+
|
|
c.loggers.Add(1)
|
|
c.loggers.Add(1)
|
|
|
|
+ // Start a goroutine that cleans up the bundler, its channel
|
|
|
|
+ // and the writer goroutines when the client is closed.
|
|
go func() {
|
|
go func() {
|
|
defer c.loggers.Done()
|
|
defer c.loggers.Done()
|
|
<-c.donec
|
|
<-c.donec
|
|
- l.bundler.Close()
|
|
|
|
|
|
+ l.bundler.Flush()
|
|
}()
|
|
}()
|
|
return l
|
|
return l
|
|
}
|
|
}
|
|
@@ -331,7 +452,7 @@ func (w severityWriter) Write(p []byte) (n int, err error) {
|
|
return len(p), nil
|
|
return len(p), nil
|
|
}
|
|
}
|
|
|
|
|
|
-// Close closes the client.
|
|
|
|
|
|
+// Close waits for all opened loggers to be flushed and closes the client.
|
|
func (c *Client) Close() error {
|
|
func (c *Client) Close() error {
|
|
if c.closed {
|
|
if c.closed {
|
|
return nil
|
|
return nil
|
|
@@ -340,9 +461,12 @@ func (c *Client) Close() error {
|
|
c.loggers.Wait() // wait for all bundlers to flush and close
|
|
c.loggers.Wait() // wait for all bundlers to flush and close
|
|
// Now there can be no more errors.
|
|
// Now there can be no more errors.
|
|
close(c.errc) // terminate error goroutine
|
|
close(c.errc) // terminate error goroutine
|
|
- // Return only the first error. Since all clients share an underlying connection,
|
|
|
|
- // Closes after the first always report a "connection is closing" error.
|
|
|
|
- err := c.client.Close()
|
|
|
|
|
|
+ // Prefer errors arising from logging to the error returned from Close.
|
|
|
|
+ err := c.extractErrorInfo()
|
|
|
|
+ err2 := c.client.Close()
|
|
|
|
+ if err == nil {
|
|
|
|
+ err = err2
|
|
|
|
+ }
|
|
c.closed = true
|
|
c.closed = true
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -417,9 +541,8 @@ type Entry struct {
|
|
// The zero value is Default.
|
|
// The zero value is Default.
|
|
Severity Severity
|
|
Severity Severity
|
|
|
|
|
|
- // Payload must be either a string or something that
|
|
|
|
- // marshals via the encoding/json package to a JSON object
|
|
|
|
- // (and not any other type of JSON value).
|
|
|
|
|
|
+ // Payload must be either a string, or something that marshals via the
|
|
|
|
+ // encoding/json package to a JSON object (and not any other type of JSON value).
|
|
Payload interface{}
|
|
Payload interface{}
|
|
|
|
|
|
// Labels optionally specifies key/value labels for the log entry.
|
|
// Labels optionally specifies key/value labels for the log entry.
|
|
@@ -448,10 +571,13 @@ type Entry struct {
|
|
// reading entries. It is an error to set it when writing entries.
|
|
// reading entries. It is an error to set it when writing entries.
|
|
LogName string
|
|
LogName string
|
|
|
|
|
|
- // Resource is the monitored resource associated with the entry. It is set
|
|
|
|
- // by the client when reading entries. It is an error to set it when
|
|
|
|
- // writing entries.
|
|
|
|
|
|
+ // Resource is the monitored resource associated with the entry.
|
|
Resource *mrpb.MonitoredResource
|
|
Resource *mrpb.MonitoredResource
|
|
|
|
+
|
|
|
|
+ // Trace is the resource name of the trace associated with the log entry,
|
|
|
|
+ // if any. If it contains a relative resource name, the name is assumed to
|
|
|
|
+ // be relative to //tracing.googleapis.com.
|
|
|
|
+ Trace string
|
|
}
|
|
}
|
|
|
|
|
|
// HTTPRequest contains an http.Request as well as additional
|
|
// HTTPRequest contains an http.Request as well as additional
|
|
@@ -476,6 +602,10 @@ type HTTPRequest struct {
|
|
// received until the response was sent.
|
|
// received until the response was sent.
|
|
Latency time.Duration
|
|
Latency time.Duration
|
|
|
|
|
|
|
|
+ // LocalIP is the IP address (IPv4 or IPv6) of the origin server that the request
|
|
|
|
+ // was sent to.
|
|
|
|
+ LocalIP string
|
|
|
|
+
|
|
// RemoteIP is the IP address (IPv4 or IPv6) of the client that issued the
|
|
// RemoteIP is the IP address (IPv4 or IPv6) of the client that issued the
|
|
// HTTP request. Examples: "192.168.1.1", "FE80::0202:B3FF:FE1E:8329".
|
|
// HTTP request. Examples: "192.168.1.1", "FE80::0202:B3FF:FE1E:8329".
|
|
RemoteIP string
|
|
RemoteIP string
|
|
@@ -499,19 +629,23 @@ func fromHTTPRequest(r *HTTPRequest) *logtypepb.HttpRequest {
|
|
}
|
|
}
|
|
u := *r.Request.URL
|
|
u := *r.Request.URL
|
|
u.Fragment = ""
|
|
u.Fragment = ""
|
|
- return &logtypepb.HttpRequest{
|
|
|
|
|
|
+ pb := &logtypepb.HttpRequest{
|
|
RequestMethod: r.Request.Method,
|
|
RequestMethod: r.Request.Method,
|
|
RequestUrl: u.String(),
|
|
RequestUrl: u.String(),
|
|
RequestSize: r.RequestSize,
|
|
RequestSize: r.RequestSize,
|
|
Status: int32(r.Status),
|
|
Status: int32(r.Status),
|
|
ResponseSize: r.ResponseSize,
|
|
ResponseSize: r.ResponseSize,
|
|
- Latency: ptypes.DurationProto(r.Latency),
|
|
|
|
UserAgent: r.Request.UserAgent(),
|
|
UserAgent: r.Request.UserAgent(),
|
|
|
|
+ ServerIp: r.LocalIP,
|
|
RemoteIp: r.RemoteIP, // TODO(jba): attempt to parse http.Request.RemoteAddr?
|
|
RemoteIp: r.RemoteIP, // TODO(jba): attempt to parse http.Request.RemoteAddr?
|
|
Referer: r.Request.Referer(),
|
|
Referer: r.Request.Referer(),
|
|
CacheHit: r.CacheHit,
|
|
CacheHit: r.CacheHit,
|
|
CacheValidatedWithOriginServer: r.CacheValidatedWithOriginServer,
|
|
CacheValidatedWithOriginServer: r.CacheValidatedWithOriginServer,
|
|
}
|
|
}
|
|
|
|
+ if r.Latency != 0 {
|
|
|
|
+ pb.Latency = ptypes.DurationProto(r.Latency)
|
|
|
|
+ }
|
|
|
|
+ return pb
|
|
}
|
|
}
|
|
|
|
|
|
// toProtoStruct converts v, which must marshal into a JSON object,
|
|
// toProtoStruct converts v, which must marshal into a JSON object,
|
|
@@ -521,13 +655,19 @@ func toProtoStruct(v interface{}) (*structpb.Struct, error) {
|
|
if s, ok := v.(*structpb.Struct); ok {
|
|
if s, ok := v.(*structpb.Struct); ok {
|
|
return s, nil
|
|
return s, nil
|
|
}
|
|
}
|
|
- // v is a Go struct that supports JSON marshalling. We want a Struct
|
|
|
|
|
|
+ // v is a Go value that supports JSON marshalling. We want a Struct
|
|
// protobuf. Some day we may have a more direct way to get there, but right
|
|
// protobuf. Some day we may have a more direct way to get there, but right
|
|
- // now the only way is to marshal the Go struct to JSON, unmarshal into a
|
|
|
|
|
|
+ // now the only way is to marshal the Go value to JSON, unmarshal into a
|
|
// map, and then build the Struct proto from the map.
|
|
// map, and then build the Struct proto from the map.
|
|
- jb, err := json.Marshal(v)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, fmt.Errorf("logging: json.Marshal: %v", err)
|
|
|
|
|
|
+ var jb []byte
|
|
|
|
+ var err error
|
|
|
|
+ if raw, ok := v.(json.RawMessage); ok { // needed for Go 1.7 and below
|
|
|
|
+ jb = []byte(raw)
|
|
|
|
+ } else {
|
|
|
|
+ jb, err = json.Marshal(v)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, fmt.Errorf("logging: json.Marshal: %v", err)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
var m map[string]interface{}
|
|
var m map[string]interface{}
|
|
err = json.Unmarshal(jb, &m)
|
|
err = json.Unmarshal(jb, &m)
|
|
@@ -548,21 +688,21 @@ func jsonMapToProtoStruct(m map[string]interface{}) *structpb.Struct {
|
|
func jsonValueToStructValue(v interface{}) *structpb.Value {
|
|
func jsonValueToStructValue(v interface{}) *structpb.Value {
|
|
switch x := v.(type) {
|
|
switch x := v.(type) {
|
|
case bool:
|
|
case bool:
|
|
- return &structpb.Value{Kind: &structpb.Value_BoolValue{x}}
|
|
|
|
|
|
+ return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: x}}
|
|
case float64:
|
|
case float64:
|
|
- return &structpb.Value{Kind: &structpb.Value_NumberValue{x}}
|
|
|
|
|
|
+ return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: x}}
|
|
case string:
|
|
case string:
|
|
- return &structpb.Value{Kind: &structpb.Value_StringValue{x}}
|
|
|
|
|
|
+ return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: x}}
|
|
case nil:
|
|
case nil:
|
|
return &structpb.Value{Kind: &structpb.Value_NullValue{}}
|
|
return &structpb.Value{Kind: &structpb.Value_NullValue{}}
|
|
case map[string]interface{}:
|
|
case map[string]interface{}:
|
|
- return &structpb.Value{Kind: &structpb.Value_StructValue{jsonMapToProtoStruct(x)}}
|
|
|
|
|
|
+ return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: jsonMapToProtoStruct(x)}}
|
|
case []interface{}:
|
|
case []interface{}:
|
|
var vals []*structpb.Value
|
|
var vals []*structpb.Value
|
|
for _, e := range x {
|
|
for _, e := range x {
|
|
vals = append(vals, jsonValueToStructValue(e))
|
|
vals = append(vals, jsonValueToStructValue(e))
|
|
}
|
|
}
|
|
- return &structpb.Value{Kind: &structpb.Value_ListValue{&structpb.ListValue{vals}}}
|
|
|
|
|
|
+ return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}}
|
|
default:
|
|
default:
|
|
panic(fmt.Sprintf("bad type %T for JSON value", v))
|
|
panic(fmt.Sprintf("bad type %T for JSON value", v))
|
|
}
|
|
}
|
|
@@ -590,38 +730,37 @@ func (l *Logger) LogSync(ctx context.Context, e Entry) error {
|
|
func (l *Logger) Log(e Entry) {
|
|
func (l *Logger) Log(e Entry) {
|
|
ent, err := toLogEntry(e)
|
|
ent, err := toLogEntry(e)
|
|
if err != nil {
|
|
if err != nil {
|
|
- l.error(err)
|
|
|
|
|
|
+ l.client.error(err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
if err := l.bundler.Add(ent, proto.Size(ent)); err != nil {
|
|
if err := l.bundler.Add(ent, proto.Size(ent)); err != nil {
|
|
- l.error(err)
|
|
|
|
|
|
+ l.client.error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Flush blocks until all currently buffered log entries are sent.
|
|
// Flush blocks until all currently buffered log entries are sent.
|
|
-func (l *Logger) Flush() {
|
|
|
|
|
|
+//
|
|
|
|
+// If any errors occurred since the last call to Flush from any Logger, or the
|
|
|
|
+// creation of the client if this is the first call, then Flush returns a non-nil
|
|
|
|
+// error with summary information about the errors. This information is unlikely to
|
|
|
|
+// be actionable. For more accurate error reporting, set Client.OnError.
|
|
|
|
+func (l *Logger) Flush() error {
|
|
l.bundler.Flush()
|
|
l.bundler.Flush()
|
|
|
|
+ return l.client.extractErrorInfo()
|
|
}
|
|
}
|
|
|
|
|
|
-func (l *Logger) writeLogEntries(ctx context.Context, entries []*logpb.LogEntry) {
|
|
|
|
|
|
+func (l *Logger) writeLogEntries(entries []*logpb.LogEntry) {
|
|
req := &logpb.WriteLogEntriesRequest{
|
|
req := &logpb.WriteLogEntriesRequest{
|
|
LogName: l.logName,
|
|
LogName: l.logName,
|
|
Resource: l.commonResource,
|
|
Resource: l.commonResource,
|
|
Labels: l.commonLabels,
|
|
Labels: l.commonLabels,
|
|
Entries: entries,
|
|
Entries: entries,
|
|
}
|
|
}
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
|
|
|
+ defer cancel()
|
|
_, err := l.client.client.WriteLogEntries(ctx, req)
|
|
_, err := l.client.client.WriteLogEntries(ctx, req)
|
|
if err != nil {
|
|
if err != nil {
|
|
- l.error(err)
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// error puts the error on the client's error channel
|
|
|
|
-// without blocking.
|
|
|
|
-func (l *Logger) error(err error) {
|
|
|
|
- select {
|
|
|
|
- case l.client.errc <- err:
|
|
|
|
- default:
|
|
|
|
|
|
+ l.client.error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -658,17 +797,18 @@ func toLogEntry(e Entry) (*logpb.LogEntry, error) {
|
|
HttpRequest: fromHTTPRequest(e.HTTPRequest),
|
|
HttpRequest: fromHTTPRequest(e.HTTPRequest),
|
|
Operation: e.Operation,
|
|
Operation: e.Operation,
|
|
Labels: e.Labels,
|
|
Labels: e.Labels,
|
|
|
|
+ Trace: e.Trace,
|
|
|
|
+ Resource: e.Resource,
|
|
}
|
|
}
|
|
-
|
|
|
|
switch p := e.Payload.(type) {
|
|
switch p := e.Payload.(type) {
|
|
case string:
|
|
case string:
|
|
- ent.Payload = &logpb.LogEntry_TextPayload{p}
|
|
|
|
|
|
+ ent.Payload = &logpb.LogEntry_TextPayload{TextPayload: p}
|
|
default:
|
|
default:
|
|
s, err := toProtoStruct(p)
|
|
s, err := toProtoStruct(p)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
- ent.Payload = &logpb.LogEntry_JsonPayload{s}
|
|
|
|
|
|
+ ent.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
|
|
}
|
|
}
|
|
return ent, nil
|
|
return ent, nil
|
|
}
|
|
}
|