# Copyright (C) 2017-2022 Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab. If not, see <https://www.gnu.org/licenses/>.
# ## Noise Generation
#
# #### Contains methods for generating valid (learning with noise is possible) noise matrices, generating noisy labels given a noise matrix, generating valid noise matrices with a specific trace value, and more.
from __future__ import print_function, absolute_import, division, \
unicode_literals, with_statement
import numpy as np
from cleanlab.util import value_counts, confusion_matrix
import warnings
[docs]def noise_matrix_is_valid(noise_matrix, py, verbose=False):
"""Given a prior py = p(y=k), returns true if the given noise_matrix is a
learnable matrix. Learnability means that it is possible to achieve
better than random performance, on average, for the amount of noise in
noise_matrix. """
# Number of classes
K = len(py)
# Let's assume some number of training examples for code readability,
# but it doesn't matter what we choose as its not actually used.
N = float(10000)
ps = np.dot(noise_matrix, py) # P(y=k)
# P(s=k, y=k')
joint_noise = np.multiply(noise_matrix, py) # / float(N)
# Check that joint_probs is valid probability matrix
if not (abs(joint_noise.sum() - 1.0) < 1e-6):
return False
# Check that noise_matrix is a valid matrix
# i.e. check p(s=k)*p(y=k) < p(s=k, y=k)
for i in range(K):
C = N * joint_noise[i][i]
E1 = N * joint_noise[i].sum() - C
E2 = N * joint_noise.T[i].sum() - C
O = N - E1 - E2 - C
if verbose:
print(
"E1E2/C", round(E1 * E2 / C),
"E1", round(E1),
"E2", round(E2),
"C", round(C),
"|", round(E1 * E2 / C + E1 + E2 + C),
"|", round(E1 * E2 / C), "<", round(O),
)
print(
round(ps[i] * py[i]), "<", round(joint_noise[i][i]),
":", ps[i] * py[i] < joint_noise[i][i],
)
if not (ps[i] * py[i] < joint_noise[i][i]):
return False
return True
[docs]def generate_noisy_labels(y, noise_matrix, verbose=False):
"""Generates noisy labels s (shape (N, 1)) from perfect labels y,
'exactly' yielding the provided noise_matrix between s and y.
Below we provide a for loop implementation of what this function does.
We do not use this implementation as it is not a fast algorithm, but
it explains as Python pseudocode what is happening in this function.
Parameters
----------
y : np.array (shape (N, 1))
Perfect labels, without any noise. Contains K distinct natural number
classes, e.g. 0, 1,..., K-1
noise_matrix : np.array of shape (K, K), K = number of classes
A conditional probablity matrix of the form P(s=k_s|y=k_y) containing
the fraction of examples in every class, labeled as every other class.
Assumes columns of noise_matrix sum to 1.
Examples
--------
.. code:: python
# Generate s
count_joint = (noise_matrix * py * len(y)).round().astype(int)
s = np.array(y)
for k_s in range(K):
for k_y in range(K):
if k_s != k_y:
idx_flip = np.where((s==k_y)&(y==k_y))[0]
if len(idx_flip): # pragma: no cover
s[np.random.choice(
idx_flip,
count_joint[k_s][k_y],
replace=False,
)] = k_s
"""
# Make y a numpy array, if it is not
y = np.asarray(y)
# Number of classes
K = len(noise_matrix)
# Compute p(y=k)
py = value_counts(y) / float(len(y))
# Counts of pairs (s, y)
count_joint = (noise_matrix * py * len(y)).astype(int)
# Remove diagonal entries as they do not involve flipping of labels.
np.fill_diagonal(count_joint, 0)
# Generate s
s = np.array(y)
for k in range(K): # Iterate over true class y == k
# Get the noisy s labels that have non-zero counts
s_labels = np.where(count_joint[:, k] != 0)[0]
# Find out how many of each noisy s label we need to flip to
s_counts = count_joint[s_labels, k]
# Create a list of the new noisy labels
noise = [s_labels[i] for i, c in enumerate(s_counts) for z in range(c)]
# Randomly choose y labels for class k and set them to the noisy labels.
idx_flip = np.where((s == k) & (y == k))[0]
if len(idx_flip) and len(noise) and len(idx_flip) >= len(
noise): # pragma: no cover
s[np.random.choice(idx_flip, len(noise), replace=False)] = noise
# Validate that s indeed produces the correct noise_matrix (or close to it)
# Compute the actual noise matrix induced by s
# counts = confusion_matrix(s, y).astype(float)
# new_noise_matrix = counts / counts.sum(axis=0)
# assert(np.linalg.norm(noise_matrix - new_noise_matrix) <= 2)
return s
[docs]def generate_noise_matrix_from_trace(
K,
trace,
max_trace_prob=1.0,
min_trace_prob=1e-5,
max_noise_rate=1 - 1e-5,
min_noise_rate=0.0,
valid_noise_matrix=True,
py=None,
frac_zero_noise_rates=0.,
seed=0,
max_iter=10000,
):
"""Generates a K x K noise matrix P(s=k_s|y=k_y) with trace
as the np.mean(np.diagonal(noise_matrix)).
Parameters
----------
K : int
Creates a noise matrix of shape (K, K). Implies there are
K classes for learning with noisy labels.
trace : float (0.0, 1.0]
Sum of diagonal entries of np.array of random probabilities returned.
max_trace_prob : float (0.0, 1.0]
Maximum probability of any entry in the trace of the return matrix.
min_trace_prob : float [0.0, 1.0)
Minimum probability of any entry in the trace of the return matrix.
max_noise_rate : float (0.0, 1.0]
Maximum noise_rate (non-diagonal entry) in the returned np.array.
min_noise_rate : float [0.0, 1.0)
Minimum noise_rate (non-diagonal entry) in the returned np.array.
valid_noise_matrix : bool
If True, returns a matrix having all necessary conditions for
learning with noisy labels. In particular, p(y=k)p(s=k) < p(y=k,s=k)
is satisfied. This requires that Trace > 1.
py : np.array (shape (K, 1))
Fraction (prior probability) of each true/hidden class label, P(y = k).
REQUIRED when valid_noise_matrix == True.
frac_zero_noise_rates : float
The fraction of the n*(n-1) noise rates
that will be set to 0. Note that if you set a high trace, it may be
impossible to also have a low fraction of zero noise rates without
forcing all non-"1" diagonal values. Instead, when this happens we only
guarantee to produce a noise matrix with frac_zero_noise_rates **or
higher**. The opposite occurs with a small trace.
seed : int
Seeds the random number generator for numpy.
max_iter : int (default: 10000)
The max number of tries to produce a valid matrix before returning False.
Returns
-------
np.array (shape (K, K))
noise matrix P(s=k_s|y=k_y) with trace
as the np.sum(np.diagonal(noise_matrix)).
This a conditional probability matrix and a
left stochastic matrix."""
if valid_noise_matrix and trace <= 1:
raise ValueError(
"trace = {}. trace > 1 is necessary for a".format(trace) +
" valid noise matrix to be returned (valid_noise_matrix == True)")
if valid_noise_matrix and py is None and K > 2:
raise ValueError(
"py must be provided (not None) if the input parameter" +
" valid_noise_matrix == True")
if K <= 1:
raise ValueError('K must be >= 2, but K = {}.'.format(K))
if max_iter < 1:
return False
np.random.seed(seed)
# Special (highly constrained) case with faster solution.
# Every 2 x 2 noise matrix with trace > 1 is valid because p(y) is not used
if K == 2:
if frac_zero_noise_rates >= 0.5: # Include a single zero noise rate
noise_mat = np.array([
[1., 1 - (trace - 1.)],
[0., trace - 1.],
])
return noise_mat if np.random.rand() > 0.5 else np.rot90(noise_mat,
k=2)
else: # No zero noise rates
diag = generate_n_rand_probabilities_that_sum_to_m(2, trace)
noise_matrix = np.array([
[diag[0], 1 - diag[1]],
[1 - diag[0], diag[1]],
])
return noise_matrix
# K > 2
for z in range(max_iter):
noise_matrix = np.zeros(shape=(K, K))
# Randomly generate noise_matrix diagonal.
nm_diagonal = generate_n_rand_probabilities_that_sum_to_m(
n=K,
m=trace,
max_prob=max_trace_prob,
min_prob=min_trace_prob,
)
np.fill_diagonal(noise_matrix, nm_diagonal)
# Randomly distribute number of zero-noise-rates across columns
num_col_with_noise = K - np.count_nonzero(1 == nm_diagonal)
num_zero_noise_rates = int(K * (K - 1) * frac_zero_noise_rates)
# Remove zeros already in [1,0,..,0] columns
num_zero_noise_rates -= (K - num_col_with_noise) * (K - 1)
num_zero_noise_rates = np.maximum(num_zero_noise_rates,
0) # Prevent negative
num_zero_noise_rates_per_col = randomly_distribute_N_balls_into_K_bins(
N=num_zero_noise_rates,
K=num_col_with_noise,
max_balls_per_bin=K - 2,
# 2 = one for diagonal, and one to sum to 1
min_balls_per_bin=0,
) if K > 2 else np.array([0, 0]) # Special case when K == 2
stack_nonzero_noise_rates_per_col = list(
K - 1 - num_zero_noise_rates_per_col)[::-1]
# Randomly generate noise rates for columns with noise.
for col in np.arange(K)[nm_diagonal != 1]:
num_noise = stack_nonzero_noise_rates_per_col.pop()
# Generate num_noise noise_rates for the given column.
noise_rates_col = list(generate_n_rand_probabilities_that_sum_to_m(
n=num_noise,
m=1 - nm_diagonal[col],
max_prob=max_noise_rate,
min_prob=min_noise_rate,
))
# Randomly select which rows of the noisy column to assign the
# random noise rates
rows = np.random.choice([row for row in range(K) if row != col],
num_noise, replace=False)
for row in rows:
noise_matrix[row][col] = noise_rates_col.pop()
if not valid_noise_matrix or noise_matrix_is_valid(noise_matrix, py):
break
return noise_matrix
[docs]def generate_n_rand_probabilities_that_sum_to_m(
n,
m,
max_prob=1.0,
min_prob=0.0,
):
"""When min_prob=0 and max_prob = 1.0, this method is deprecated.
Instead use np.random.dirichlet(np.ones(n))*m
Generates 'n' random probabilities that sum to 'm'.
Parameters
----------
n : int
Length of np.array of random probabilities to be returned.
m : float
Sum of np.array of random probabilities that is returned.
max_prob : float (0.0, 1.0] | Default value is 1.0
Maximum probability of any entry in the returned np.array.
min_prob : float [0.0, 1.0) | Default value is 0.0
Minimum probability of any entry in the returned np.array."""
epsilon = 1e-6 # Imprecision allowed for inequalities with floats
if n == 0:
return np.array([])
if (max_prob + epsilon) < m / float(n):
raise ValueError("max_prob must be greater or equal to m / n, but " +
"max_prob = " + str(max_prob) + ", m = " + str(
m) + ", n = " +
str(n) + ", m / n = " + str(m / float(n)))
if min_prob > (m + epsilon) / float(n):
raise ValueError("min_prob must be less or equal to m / n, but " +
"max_prob = " + str(max_prob) + ", m = " + str(
m) + ", n = " +
str(n) + ", m / n = " + str(m / float(n)))
# When max_prob = 1, min_prob = 0, the next two lines are equivalent to:
# intermediate = np.sort(np.append(np.random.uniform(0, 1, n-1), [0, 1]))
# result = (intermediate[1:] - intermediate[:-1]) * m
result = np.random.dirichlet(np.ones(n)) * m
min_val = min(result)
max_val = max(result)
while max_val > (max_prob + epsilon):
new_min = min_val + (max_val - max_prob)
# This adjustment prevents the new max from always being max_prob.
adjustment = (max_prob - new_min) * np.random.rand()
result[np.argmin(result)] = new_min + adjustment
result[np.argmax(result)] = max_prob - adjustment
min_val = min(result)
max_val = max(result)
min_val = min(result)
max_val = max(result)
while min_val < (min_prob - epsilon):
min_val = min(result)
max_val = max(result)
new_max = max_val - (min_prob - min_val)
# This adjustment prevents the new min from always being min_prob.
adjustment = (new_max - min_prob) * np.random.rand()
result[np.argmax(result)] = new_max - adjustment
result[np.argmin(result)] = min_prob + adjustment
min_val = min(result)
max_val = max(result)
return result
[docs]def randomly_distribute_N_balls_into_K_bins(
N, # int
K, # int
max_balls_per_bin=None,
min_balls_per_bin=None,
):
"""Returns a uniformly random numpy integer array of length N that sums
to K.
Parameters
----------
N : int
K : int
max_balls_per_bin : int
min_balls_per_bin : int"""
if N == 0:
return np.zeros(K, dtype=int)
if max_balls_per_bin is None:
max_balls_per_bin = N
else:
max_balls_per_bin = min(max_balls_per_bin, N)
if min_balls_per_bin is None:
min_balls_per_bin = 0
else:
min_balls_per_bin = min(min_balls_per_bin, N / K)
if N / float(K) > max_balls_per_bin:
N = max_balls_per_bin * K
arr = np.round(generate_n_rand_probabilities_that_sum_to_m(
n=K,
m=1,
max_prob=max_balls_per_bin / float(N),
min_prob=min_balls_per_bin / float(N),
) * N)
while sum(arr) != N:
while sum(arr) > N: # pragma: no cover
arr[np.argmax(arr)] -= 1
while sum(arr) < N:
arr[np.argmin(arr)] += 1
return arr.astype(int)
# Deprecated functions below
[docs]def generate_noise_matrix(
K,
max_noise_rate=1.0,
frac_zero_noise_rates=0.0,
verbose=False,
):
"""DEPRECATED - Use generate_noise_matrix_from_trace()
Generates a noise matrix by randomly assigning noise rates
up to max_noise_rate, then setting noise rates to
zero until P(s!=k|s=k) < 1 is satisfied. Additionally,
frac_zero_noise_rates are set to zero.
Parameters
----------
K : int
Creates a noise matrix of shape (K, K). Implies there are
K classes for learning with noisy labels.
max_noise_rate : float
Smaller ---> easier learning problem (less noise)
frac_zero_noise_rates : float
Make problem more tractable by making a fraction of noise rates zero.
Larger --> Easier learning problem
verbose : bool
Print debugging output if set to True."""
warnings.warn(
"This method is deprecated. Use 'generate_noise_matrix_from_trace' "
"instead.",
DeprecationWarning,
)
# Init noise matrix to be random values from (0, max_noise_rate)
# P(s=k|y=k')
noise_matrix = np.random.rand(K, K) * max_noise_rate
# Round all noise rates
noise_matrix = noise_matrix.round(2)
# Initialize all P(s=k|y=k) = 0
for i in range(K):
noise_matrix[i][i] = 0.0
# Compute sum for each column
col_sum = noise_matrix.sum(axis=0)
# For each column, randomly set noise rates to zero until P(s!=k|s=k) < 1.
for y in range(K): # col
col = noise_matrix.T[y]
col_sum = np.sum(col)
while col_sum >= 1:
non_zero_indices = np.arange(K)[col != 0]
s = np.random.choice(non_zero_indices)
noise_matrix[s][y] = 0.0
col = noise_matrix.T[y]
col_sum = np.sum(col)
# Set frac_zero_noise_rates of the noise rates to 0 for increased
# tractability.
for s in range(K):
for y in range(K):
if np.random.rand() < frac_zero_noise_rates:
noise_matrix[s][y] = 0
# Compute sum for each column
col_sum = noise_matrix.sum(axis=0)
# Normalize each column such that P(s=k|y=k) = 1 - P(s!=k|s=k)
for i in range(K):
noise_matrix[i][i] = 1 - col_sum[i]
if verbose:
print("Average trace of noise matrix is",
np.trace(noise_matrix) / float(K))
return noise_matrix