throttler.go 5.6 KB
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
	"fmt"
	"net/url"
	"sync"
	"sync/atomic"
	"time"

	"github.com/pkg/errors"

	"github.com/uber/jaeger-client-go"
	"github.com/uber/jaeger-client-go/utils"
)

const (
	// minimumCredits is the minimum amount of credits necessary to not be throttled.
	// i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
	minimumCredits = 1.0
)

var (
	errorUUIDNotSet = errors.New("Throttler UUID must be set")
)

type operationBalance struct {
	Operation string  `json:"operation"`
	Balance   float64 `json:"balance"`
}

type creditResponse struct {
	Balances []operationBalance `json:"balances"`
}

type httpCreditManagerProxy struct {
	hostPort string
}

func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
	return &httpCreditManagerProxy{
		hostPort: hostPort,
	}
}

// N.B. Operations list must not be empty.
func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
	params := url.Values{}
	params.Set("service", serviceName)
	params.Set("uuid", uuid)
	for _, op := range operations {
		params.Add("operations", op)
	}
	var resp creditResponse
	if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
		return nil, errors.Wrap(err, "Failed to receive credits from agent")
	}
	return &resp, nil
}

// Throttler retrieves credits from agent and uses it to throttle operations.
type Throttler struct {
	options

	mux           sync.RWMutex
	service       string
	uuid          atomic.Value
	creditManager *httpCreditManagerProxy
	credits       map[string]float64 // map of operation->credits
	close         chan struct{}
	stopped       sync.WaitGroup
}

// NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
// the service.
func NewThrottler(service string, options ...Option) *Throttler {
	opts := applyOptions(options...)
	creditManager := newHTTPCreditManagerProxy(opts.hostPort)
	t := &Throttler{
		options:       opts,
		creditManager: creditManager,
		service:       service,
		credits:       make(map[string]float64),
		close:         make(chan struct{}),
	}
	t.stopped.Add(1)
	go t.pollManager()
	return t
}

// IsAllowed implements Throttler#IsAllowed.
func (t *Throttler) IsAllowed(operation string) bool {
	t.mux.Lock()
	defer t.mux.Unlock()
	value, ok := t.credits[operation]
	if !ok || value == 0 {
		if !ok {
			// NOTE: This appears to be a no-op at first glance, but it stores
			// the operation key in the map. Necessary for functionality of
			// Throttler#operations method.
			t.credits[operation] = 0
		}
		if !t.synchronousInitialization {
			t.metrics.ThrottledDebugSpans.Inc(1)
			return false
		}
		// If it is the first time this operation is being checked, synchronously fetch
		// the credits.
		credits, err := t.fetchCredits([]string{operation})
		if err != nil {
			// Failed to receive credits from agent, try again next time
			t.logger.Error("Failed to fetch credits: " + err.Error())
			return false
		}
		if len(credits.Balances) == 0 {
			// This shouldn't happen but just in case
			return false
		}
		for _, opBalance := range credits.Balances {
			t.credits[opBalance.Operation] += opBalance.Balance
		}
	}
	return t.isAllowed(operation)
}

// Close stops the throttler from fetching credits from remote.
func (t *Throttler) Close() error {
	close(t.close)
	t.stopped.Wait()
	return nil
}

// SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
// requests are made.
func (t *Throttler) SetProcess(process jaeger.Process) {
	if process.UUID != "" {
		t.uuid.Store(process.UUID)
	}
}

// N.B. This function must be called with the Write Lock
func (t *Throttler) isAllowed(operation string) bool {
	credits := t.credits[operation]
	if credits < minimumCredits {
		t.metrics.ThrottledDebugSpans.Inc(1)
		return false
	}
	t.credits[operation] = credits - minimumCredits
	return true
}

func (t *Throttler) pollManager() {
	defer t.stopped.Done()
	ticker := time.NewTicker(t.refreshInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			t.refreshCredits()
		case <-t.close:
			return
		}
	}
}

func (t *Throttler) operations() []string {
	t.mux.RLock()
	defer t.mux.RUnlock()
	operations := make([]string, 0, len(t.credits))
	for op := range t.credits {
		operations = append(operations, op)
	}
	return operations
}

func (t *Throttler) refreshCredits() {
	operations := t.operations()
	if len(operations) == 0 {
		return
	}
	newCredits, err := t.fetchCredits(operations)
	if err != nil {
		t.metrics.ThrottlerUpdateFailure.Inc(1)
		t.logger.Error("Failed to fetch credits: " + err.Error())
		return
	}
	t.metrics.ThrottlerUpdateSuccess.Inc(1)

	t.mux.Lock()
	defer t.mux.Unlock()
	for _, opBalance := range newCredits.Balances {
		t.credits[opBalance.Operation] += opBalance.Balance
	}
}

func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
	uuid := t.uuid.Load()
	uuidStr, _ := uuid.(string)
	if uuid == nil || uuidStr == "" {
		return nil, errorUUIDNotSet
	}
	return t.creditManager.FetchCredits(uuidStr, t.service, operations)
}