observer.go 4.3 KB
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import (
	"strconv"
	"sync"
	"time"

	"github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
	"github.com/uber/jaeger-lib/metrics"

	jaeger "github.com/uber/jaeger-client-go"
)

const defaultMaxNumberOfEndpoints = 200

// Observer is an observer that can emit RPC metrics.
type Observer struct {
	metricsByEndpoint *MetricsByEndpoint
}

// NewObserver creates a new observer that can emit RPC metrics.
func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
	return &Observer{
		metricsByEndpoint: newMetricsByEndpoint(
			metricsFactory,
			normalizer,
			defaultMaxNumberOfEndpoints,
		),
	}
}

// OnStartSpan creates a new Observer for the span.
func (o *Observer) OnStartSpan(
	operationName string,
	options opentracing.StartSpanOptions,
) jaeger.SpanObserver {
	return NewSpanObserver(o.metricsByEndpoint, operationName, options)
}

// SpanKind identifies the span as inboud, outbound, or internal
type SpanKind int

const (
	// Local span kind
	Local SpanKind = iota
	// Inbound span kind
	Inbound
	// Outbound span kind
	Outbound
)

// SpanObserver collects RPC metrics
type SpanObserver struct {
	metricsByEndpoint *MetricsByEndpoint
	operationName     string
	startTime         time.Time
	mux               sync.Mutex
	kind              SpanKind
	httpStatusCode    uint16
	err               bool
}

// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
func NewSpanObserver(
	metricsByEndpoint *MetricsByEndpoint,
	operationName string,
	options opentracing.StartSpanOptions,
) *SpanObserver {
	so := &SpanObserver{
		metricsByEndpoint: metricsByEndpoint,
		operationName:     operationName,
		startTime:         options.StartTime,
	}
	for k, v := range options.Tags {
		so.handleTagInLock(k, v)
	}
	return so
}

// handleTags watches for special tags
// - SpanKind
// - HttpStatusCode
// - Error
func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
	if key == string(ext.SpanKind) {
		if v, ok := value.(ext.SpanKindEnum); ok {
			value = string(v)
		}
		if v, ok := value.(string); ok {
			if v == string(ext.SpanKindRPCClientEnum) {
				so.kind = Outbound
			} else if v == string(ext.SpanKindRPCServerEnum) {
				so.kind = Inbound
			}
		}
		return
	}
	if key == string(ext.HTTPStatusCode) {
		if v, ok := value.(uint16); ok {
			so.httpStatusCode = v
		} else if v, ok := value.(int); ok {
			so.httpStatusCode = uint16(v)
		} else if v, ok := value.(string); ok {
			if vv, err := strconv.Atoi(v); err == nil {
				so.httpStatusCode = uint16(vv)
			}
		}
		return
	}
	if key == string(ext.Error) {
		if v, ok := value.(bool); ok {
			so.err = v
		} else if v, ok := value.(string); ok {
			if vv, err := strconv.ParseBool(v); err == nil {
				so.err = vv
			}
		}
		return
	}
}

// OnFinish emits the RPC metrics. It only has an effect when operation name
// is not blank, and the span kind is an RPC server.
func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
	so.mux.Lock()
	defer so.mux.Unlock()

	if so.operationName == "" || so.kind != Inbound {
		return
	}

	mets := so.metricsByEndpoint.get(so.operationName)
	latency := options.FinishTime.Sub(so.startTime)
	if so.err {
		mets.RequestCountFailures.Inc(1)
		mets.RequestLatencyFailures.Record(latency)
	} else {
		mets.RequestCountSuccess.Inc(1)
		mets.RequestLatencySuccess.Record(latency)
	}
	mets.recordHTTPStatusCode(so.httpStatusCode)
}

// OnSetOperationName records new operation name.
func (so *SpanObserver) OnSetOperationName(operationName string) {
	so.mux.Lock()
	so.operationName = operationName
	so.mux.Unlock()
}

// OnSetTag implements SpanObserver
func (so *SpanObserver) OnSetTag(key string, value interface{}) {
	so.mux.Lock()
	so.handleTagInLock(key, value)
	so.mux.Unlock()
}