stream.go 7.8 KB
// Package quantile computes approximate quantiles over an unbounded data
// stream within low memory and CPU bounds.
//
// A small amount of accuracy is traded to achieve the above properties.
//
// Multiple streams can be merged before calling Query to generate a single set
// of results. This is meaningful when the streams represent the same type of
// data. See Merge and Samples.
//
// For more detailed information about the algorithm used, see:
//
// Effective Computation of Biased Quantiles over Data Streams
//
// http://www.cs.rutgers.edu/~muthu/bquant.pdf
package quantile

import (
	"math"
	"sort"
)

// Sample holds an observed value and meta information for compression. JSON
// tags have been added for convenience.
type Sample struct {
	Value float64 `json:",string"`
	Width float64 `json:",string"`
	Delta float64 `json:",string"`
}

// Samples represents a slice of samples. It implements sort.Interface.
type Samples []Sample

func (a Samples) Len() int           { return len(a) }
func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
func (a Samples) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }

type invariant func(s *stream, r float64) float64

// NewLowBiased returns an initialized Stream for low-biased quantiles
// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// error guarantees can still be given even for the lower ranks of the data
// distribution.
//
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewLowBiased(epsilon float64) *Stream {
	ƒ := func(s *stream, r float64) float64 {
		return 2 * epsilon * r
	}
	return newStream(ƒ)
}

// NewHighBiased returns an initialized Stream for high-biased quantiles
// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// error guarantees can still be given even for the higher ranks of the data
// distribution.
//
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewHighBiased(epsilon float64) *Stream {
	ƒ := func(s *stream, r float64) float64 {
		return 2 * epsilon * (s.n - r)
	}
	return newStream(ƒ)
}

// NewTargeted returns an initialized Stream concerned with a particular set of
// quantile values that are supplied a priori. Knowing these a priori reduces
// space and computation time. The targets map maps the desired quantiles to
// their absolute errors, i.e. the true quantile of a value returned by a query
// is guaranteed to be within (Quantile±Epsilon).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewTargeted(targetMap map[float64]float64) *Stream {
	// Convert map to slice to avoid slow iterations on a map.
	// ƒ is called on the hot path, so converting the map to a slice
	// beforehand results in significant CPU savings.
	targets := targetMapToSlice(targetMap)

	ƒ := func(s *stream, r float64) float64 {
		var m = math.MaxFloat64
		var f float64
		for _, t := range targets {
			if t.quantile*s.n <= r {
				f = (2 * t.epsilon * r) / t.quantile
			} else {
				f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
			}
			if f < m {
				m = f
			}
		}
		return m
	}
	return newStream(ƒ)
}

type target struct {
	quantile float64
	epsilon  float64
}

func targetMapToSlice(targetMap map[float64]float64) []target {
	targets := make([]target, 0, len(targetMap))

	for quantile, epsilon := range targetMap {
		t := target{
			quantile: quantile,
			epsilon:  epsilon,
		}
		targets = append(targets, t)
	}

	return targets
}

// Stream computes quantiles for a stream of float64s. It is not thread-safe by
// design. Take care when using across multiple goroutines.
type Stream struct {
	*stream
	b      Samples
	sorted bool
}

func newStream(ƒ invariant) *Stream {
	x := &stream{ƒ: ƒ}
	return &Stream{x, make(Samples, 0, 500), true}
}

// Insert inserts v into the stream.
func (s *Stream) Insert(v float64) {
	s.insert(Sample{Value: v, Width: 1})
}

func (s *Stream) insert(sample Sample) {
	s.b = append(s.b, sample)
	s.sorted = false
	if len(s.b) == cap(s.b) {
		s.flush()
	}
}

// Query returns the computed qth percentiles value. If s was created with
// NewTargeted, and q is not in the set of quantiles provided a priori, Query
// will return an unspecified result.
func (s *Stream) Query(q float64) float64 {
	if !s.flushed() {
		// Fast path when there hasn't been enough data for a flush;
		// this also yields better accuracy for small sets of data.
		l := len(s.b)
		if l == 0 {
			return 0
		}
		i := int(math.Ceil(float64(l) * q))
		if i > 0 {
			i -= 1
		}
		s.maybeSort()
		return s.b[i].Value
	}
	s.flush()
	return s.stream.query(q)
}

// Merge merges samples into the underlying streams samples. This is handy when
// merging multiple streams from separate threads, database shards, etc.
//
// ATTENTION: This method is broken and does not yield correct results. The
// underlying algorithm is not capable of merging streams correctly.
func (s *Stream) Merge(samples Samples) {
	sort.Sort(samples)
	s.stream.merge(samples)
}

// Reset reinitializes and clears the list reusing the samples buffer memory.
func (s *Stream) Reset() {
	s.stream.reset()
	s.b = s.b[:0]
}

// Samples returns stream samples held by s.
func (s *Stream) Samples() Samples {
	if !s.flushed() {
		return s.b
	}
	s.flush()
	return s.stream.samples()
}

// Count returns the total number of samples observed in the stream
// since initialization.
func (s *Stream) Count() int {
	return len(s.b) + s.stream.count()
}

func (s *Stream) flush() {
	s.maybeSort()
	s.stream.merge(s.b)
	s.b = s.b[:0]
}

func (s *Stream) maybeSort() {
	if !s.sorted {
		s.sorted = true
		sort.Sort(s.b)
	}
}

func (s *Stream) flushed() bool {
	return len(s.stream.l) > 0
}

type stream struct {
	n float64
	l []Sample
	ƒ invariant
}

func (s *stream) reset() {
	s.l = s.l[:0]
	s.n = 0
}

func (s *stream) insert(v float64) {
	s.merge(Samples{{v, 1, 0}})
}

func (s *stream) merge(samples Samples) {
	// TODO(beorn7): This tries to merge not only individual samples, but
	// whole summaries. The paper doesn't mention merging summaries at
	// all. Unittests show that the merging is inaccurate. Find out how to
	// do merges properly.
	var r float64
	i := 0
	for _, sample := range samples {
		for ; i < len(s.l); i++ {
			c := s.l[i]
			if c.Value > sample.Value {
				// Insert at position i.
				s.l = append(s.l, Sample{})
				copy(s.l[i+1:], s.l[i:])
				s.l[i] = Sample{
					sample.Value,
					sample.Width,
					math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
					// TODO(beorn7): How to calculate delta correctly?
				}
				i++
				goto inserted
			}
			r += c.Width
		}
		s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
		i++
	inserted:
		s.n += sample.Width
		r += sample.Width
	}
	s.compress()
}

func (s *stream) count() int {
	return int(s.n)
}

func (s *stream) query(q float64) float64 {
	t := math.Ceil(q * s.n)
	t += math.Ceil(s.ƒ(s, t) / 2)
	p := s.l[0]
	var r float64
	for _, c := range s.l[1:] {
		r += p.Width
		if r+c.Width+c.Delta > t {
			return p.Value
		}
		p = c
	}
	return p.Value
}

func (s *stream) compress() {
	if len(s.l) < 2 {
		return
	}
	x := s.l[len(s.l)-1]
	xi := len(s.l) - 1
	r := s.n - 1 - x.Width

	for i := len(s.l) - 2; i >= 0; i-- {
		c := s.l[i]
		if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
			x.Width += c.Width
			s.l[xi] = x
			// Remove element at i.
			copy(s.l[i:], s.l[i+1:])
			s.l = s.l[:len(s.l)-1]
			xi -= 1
		} else {
			x = c
			xi = i
		}
		r -= c.Width
	}
}

func (s *stream) samples() Samples {
	samples := make(Samples, len(s.l))
	copy(samples, s.l)
	return samples
}