#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is based on the PySpark version which in turn
# was ported from spark/util/StatCounter.scala
from __future__ import division
import copy
import math
import numbers
from collections import namedtuple
from pysparkling.sql.functions import parse
from pysparkling.sql.types import row_from_keyed_values
try:
from numpy import maximum, minimum, sqrt
except ImportError:
maximum = max
minimum = min
sqrt = math.sqrt
[docs]class StatCounter(object):
def __init__(self, values=None):
self.n = 0 # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
self.maxValue = float("-inf")
self.minValue = float("inf")
if values:
for v in values:
self.merge(v)
# Add a value into this StatCounter, updating the internal statistics.
def merge(self, value):
delta = value - self.mu
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
self.maxValue = maximum(self.maxValue, value)
self.minValue = minimum(self.minValue, value)
return self
# Merge another StatCounter into this one, adding up the
# internal statistics.
def mergeStats(self, other):
if other is self: # reference equality holds
# Avoid overwriting fields in a weird order
return self.mergeStats(other.copy())
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
self.n = other.n
self.maxValue = other.maxValue
self.minValue = other.minValue
elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
self.mu = self.mu + (delta * other.n) / (self.n + other.n)
elif self.n * 10 < other.n:
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = ((self.mu * self.n + other.mu * other.n)
/ (self.n + other.n))
self.maxValue = maximum(self.maxValue, other.maxValue)
self.minValue = minimum(self.minValue, other.minValue)
self.m2 += other.m2 + ((delta * delta * self.n * other.n)
/ (self.n + other.n))
self.n += other.n
return self
# Clone this StatCounter
def copy(self):
return copy.deepcopy(self)
def count(self):
return int(self.n)
def mean(self):
return self.mu
def sum(self):
return self.n * self.mu
def min(self):
return self.minValue
def max(self):
return self.maxValue
# Return the variance of the values.
def variance(self):
if self.n == 0:
return float('nan')
return self.m2 / self.n
#
# Return the sample variance, which corrects for bias in estimating
# the variance by dividing by N-1 instead of N.
#
def sampleVariance(self):
if self.n <= 1:
return float('nan')
return self.m2 / (self.n - 1)
# Return the standard deviation of the values.
def stdev(self):
return sqrt(self.variance())
#
# Return the sample standard deviation of the values, which corrects for
# bias in estimating the variance by dividing by N-1 instead of N.
#
def sampleStdev(self):
return sqrt(self.sampleVariance())
def __repr__(self):
return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" %
(self.count(), self.mean(), self.stdev(), self.max(), self.min()))
PercentileStats = namedtuple("PercentileStats", ["value", "g", "delta"])
class ColumnStatHelper(object):
"""
This class is used to compute statistics on a column
It computes basic statistics such as count, min, max, average, sample variance and
sample standard deviation
It also approximate percentiles with an implementation based, like Spark, on the algorithm
proposed in the paper "Space-efficient Online Computation of Quantile Summaries"
by Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670)
The result of this algorithm has the following deterministic bound:
If the DataFrame has N elements and if we request the quantile at
probability `p` up to error `err`, then the algorithm will return
a sample `x` from the DataFrame so that the *exact* rank of `x` is
close to (p * N).
More precisely:
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
Its implementation for simple metrics differs from pysparkling.stat_counter.StatCounter
in order to match Spark behaviour when computing stats on a dataset, among the discrepancies:
- handle min and max for strings
- return min and max as int when computed on int
- name "stddev" the *sample* standard deviation metric
"""
def __init__(self, column, percentiles_relative_error=1 / 10000):
self.column = column
self.count = 0
self.sum_of_values = 0
self.m2 = 0
self.m3 = 0
self.m4 = 0
self.min_value = None
self.max_value = None
# Percentiles-related fields
self.sampled = [] # List of PercentileStats
self.head_sampled = [] # List acting as a buffer of the last added values
self.head_max_size = 50000 # Buffer size after which buffer is added to sampled
self.compression_threshold = 10000 # When to compress sampled
self.percentiles_relative_error = percentiles_relative_error
self.compressed = True
def merge(self, row, schema):
value = self.column.eval(row, schema)
if value is not None:
self.update_counters(value)
if isinstance(value, numbers.Number):
self.update_sample(value)
return self
def update_counters(self, value):
if self.count != 0:
self.min_value = min(self.min_value, value)
self.max_value = max(self.max_value, value)
else:
self.min_value = value
self.max_value = value
try:
self.update_moments(value)
self.sum_of_values += value
except TypeError:
self.sum_of_values = None
self.m2 = None
self.m3 = None
self.m4 = None
self.count += 1
def update_moments(self, value):
delta = value - self.mean if self.count > 0 else 0
deltaN = delta / (self.count + 1)
self.m2 = self.m2 + delta * (delta - deltaN)
delta2 = delta * delta
deltaN2 = deltaN * deltaN
self.m3 = self.m3 - 3 * deltaN * self.m2 + delta * (delta2 - deltaN2)
self.m4 = (self.m4
- 4 * deltaN * self.m3
- 6 * deltaN2 * self.m2
+ delta * (delta * delta2 - deltaN * deltaN2))
def update_sample(self, value):
self.head_sampled.append(value)
self.compressed = False
if len(self.head_sampled) >= self.head_max_size:
self.add_head_to_sample()
if len(self.sampled) >= self.compression_threshold:
self.compress()
def add_head_to_sample(self):
if not self.head_sampled:
return
count_without_head = self.count - len(self.head_sampled)
sorted_head = sorted(self.head_sampled)
new_samples = []
sample_idx = 0
for ops_idx, current_sample in enumerate(sorted_head):
for s in self.sampled[sample_idx:]:
if s.value <= current_sample:
new_samples.append(s)
sample_idx += 1
else:
break
count_without_head += 1
if not new_samples or (
sample_idx == len(self.sampled) and ops_idx == len(sorted_head) - 1
):
delta = 0
else:
delta = math.floor(2 * self.percentiles_relative_error * count_without_head)
new_samples.append(PercentileStats(value=current_sample, g=1, delta=delta))
new_samples += self.sampled[sample_idx:]
self.sampled = new_samples
self.head_sampled = []
self.compressed = False
def compress(self):
merge_threshold = self.merge_threshold()
reverse_compressed_sample = []
head = self.sampled[-1]
for sample1 in self.sampled[-2:0:-1]:
if sample1.g + head.g + head.delta < merge_threshold:
head = PercentileStats(value=head.value, g=head.g + sample1.g, delta=head.delta)
else:
reverse_compressed_sample.append(head)
head = sample1
reverse_compressed_sample.append(head)
current_head = self.sampled[0]
if current_head.value <= head.value and len(self.sampled) > 1:
reverse_compressed_sample.append(current_head)
self.sampled = reverse_compressed_sample[::-1]
self.compressed = True
def merge_threshold(self):
return 2 * self.percentiles_relative_error * self.count
def finalize(self):
if self.head_sampled:
self.add_head_to_sample()
if not self.compressed:
self.compress()
def mergeStats(self, other):
"""
:type other: ColumnStatHelper
"""
self.finalize()
other.finalize()
self.sampled = sorted(self.sampled + other.sampled)
self.compress()
if self.count == 0:
self.max_value = other.max_value
self.min_value = other.min_value
elif other.count != 0:
self.max_value = max(self.max_value, other.max_value)
self.min_value = min(self.min_value, other.min_value)
try:
self.merge_moments(other)
self.sum_of_values += other.sum_of_values
except TypeError:
self.sum_of_values = None
self.m2 = None
self.m3 = None
self.m4 = None
self.count += other.count
return self
def merge_moments(self, other):
n1 = self.count
n2 = other.count
new_count = n1 + n2
delta = other.mean - self.mean
deltaN = delta / new_count if new_count != 0 else 0
new_m2 = self.m2 + other.m2 + delta * deltaN * n1 * n2
new_m3 = (self.m3 + other.m3
+ deltaN * deltaN * delta * n1 * n2 * (n1 - n2)
+ 3 * deltaN * (n1 * other.m2 - n2 * self.m2))
self.m4 = (self.m4 + other.m4
+ deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2)
+ 6 * deltaN * deltaN * (n1 * n1 * other.m2 + n2 * n2 * self.m2)
+ 4 * deltaN * (n1 * other.m3 - n2 * self.m3))
self.m2 = new_m2
self.m3 = new_m3
def get_quantile(self, quantile):
self.finalize()
if not 0 <= quantile <= 1:
raise ValueError("quantile must be between 0 and 1")
if not self.sampled:
return None
if quantile <= self.percentiles_relative_error:
return self.sampled[0].value
if quantile >= 1 - self.percentiles_relative_error:
return self.sampled[-1].value
rank = math.ceil(quantile * self.count)
target_error = self.percentiles_relative_error * self.count
min_rank = 0
for cur_sample in self.sampled:
min_rank += cur_sample.g
max_rank = min_rank + cur_sample.delta
if max_rank - target_error <= rank <= min_rank + target_error:
return cur_sample.value
return self.sampled[-1].value
@property
def mean(self):
if self.count == 0 or self.sum_of_values is None:
return None
return self.sum_of_values / self.count
@property
def variance_pop(self):
if self.count == 0 or self.sum_of_values is None:
return None
return self.m2 / self.count
@property
def variance_samp(self):
if self.count <= 1 or self.sum_of_values is None:
return None
return self.m2 / (self.count - 1)
@property
def variance(self):
if self.count == 0:
return None
return self.variance_samp
@property
def stddev_pop(self):
if self.count == 0 or self.sum_of_values is None:
return None
return math.sqrt(self.variance_pop)
@property
def stddev_samp(self):
if self.count <= 1 or self.sum_of_values is None:
return None
return math.sqrt(self.variance_samp)
@property
def stddev(self):
if self.count == 0:
return None
return self.stddev_samp
@property
def min(self):
if self.count == 0:
return None
return self.min_value
@property
def max(self):
if self.count == 0:
return None
return self.max_value
@property
def sum(self):
if self.count == 0:
return None
return self.sum_of_values
@property
def skewness(self):
if self.count == 0:
return None
if self.m2 == 0:
return float("nan")
return math.sqrt(self.count) * self.m3 / math.sqrt(self.m2 * self.m2 * self.m2)
@property
def kurtosis(self):
if self.count == 0:
return None
if self.m2 == 0:
return float("nan")
return self.count * self.m4 / (self.m2 * self.m2) - 3
class RowStatHelper(object):
"""
Class use to maintain one ColumnStatHelper for each Column found when aggregating a list of Rows
"""
def __init__(self, exprs, percentiles_relative_error=1 / 10000):
self.percentiles_relative_error = percentiles_relative_error
self.column_stat_helpers = {}
self.cols = [parse(e) for e in exprs] if exprs else [parse("*")]
# As python < 3.6 does not guarantee dict ordering
# we need to keep track of in which order the columns were
self.col_names = []
def merge(self, row, schema):
for col in self.cols:
for field in col.output_fields(schema):
col_name = field.name
if col_name not in self.column_stat_helpers:
self.column_stat_helpers[col_name] = ColumnStatHelper(
parse(col_name),
self.percentiles_relative_error
)
self.col_names.append(col_name)
self.column_stat_helpers[col_name].merge(row, schema)
return self
def mergeStats(self, other):
"""
:type other: RowStatHelper
"""
for col_name in other.col_names:
counter = other.column_stat_helpers[col_name]
if col_name in self.column_stat_helpers:
self.column_stat_helpers[col_name] = (
self.column_stat_helpers[col_name].mergeStats(counter)
)
else:
self.column_stat_helpers[col_name] = counter
self.col_names.append(col_name)
return self
def get_as_rows(self, stats=("count", "mean", "stddev", "min", "max")):
"""
Provide a list of Row with the same format as the one in the
Dataset returned by Dataset.stats()
"""
return [
row_from_keyed_values(
[
("summary", stat)
] + [
(col_name, self.get_stat(self.column_stat_helpers[col_name], stat))
for col_name in self.col_names
]
)
for stat in stats
]
@staticmethod
def get_stat(stats_counter, stat):
if stat in ("count", "mean", "stddev", "min", "max"):
value = getattr(stats_counter, stat)
elif stat.endswith("%"):
try:
percentile = float(stat[:-1]) / 100
except ValueError:
raise ValueError("Unable to parse {0} as a percentile".format(stat))
value = stats_counter.get_quantile(percentile)
else:
raise ValueError("{0} is not a recognised statistic".format(stat))
return RowStatHelper.format_stat(value)
def get_col_quantile(self, col, quantile):
if str(col) not in self.column_stat_helpers:
raise Exception("Unable to get quantile for {0}".format(quantile))
quantile = self.column_stat_helpers[str(col)].get_quantile(quantile)
return float(quantile) if quantile is not None else None
@staticmethod
def format_stat(stat):
return str(stat) if stat is not None else None
class CovarianceCounter(object):
def __init__(self, method):
if method != "pearson":
raise ValueError(
"Currently only the calculation of the Pearson Correlation "
"coefficient is supported."
)
self.xAvg = 0.0 # the mean of all examples seen so far in col1
self.yAvg = 0.0 # the mean of all examples seen so far in col2
self.Ck = 0.0 # the co-moment after k examples
self.MkX = 0.0 # sum of squares of differences from the (current) mean for col1
self.MkY = 0.0 # sum of squares of differences from the (current) mean for col2
self.count = 0 # count of observed examples
def add(self, x, y):
deltaX = x - self.xAvg
deltaY = y - self.yAvg
self.count += 1
self.xAvg += deltaX / self.count
self.yAvg += deltaY / self.count
self.Ck += deltaX * (y - self.yAvg)
self.MkX += deltaX * (x - self.xAvg)
self.MkY += deltaY * (y - self.yAvg)
return self
def merge(self, other):
"""
Merge counters from other partitions. Formula can be found at:
http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
"""
if other.count > 0:
totalCount = self.count + other.count
deltaX = self.xAvg - other.xAvg
deltaY = self.yAvg - other.yAvg
self.Ck += other.Ck + deltaX * deltaY * self.count / totalCount * other.count
self.xAvg = (self.xAvg * self.count + other.xAvg * other.count) / totalCount
self.yAvg = (self.yAvg * self.count + other.yAvg * other.count) / totalCount
self.MkX += other.MkX + deltaX * deltaX * self.count / totalCount * other.count
self.MkY += other.MkY + deltaY * deltaY * self.count / totalCount * other.count
self.count = totalCount
return self
@property
def covar_samp(self):
"""
Return the sample covariance for the observed examples
"""
if self.count <= 1:
return None
return self.Ck / (self.count - 1)
@property
def covar_pop(self):
"""
Return the sample covariance for the observed examples
"""
if self.count == 0:
return None
return self.Ck / self.count
@property
def pearson_correlation(self):
return self.Ck / math.sqrt(self.MkX * self.MkY)