K-means clustering

Given a dataset of \(N\) samples \(x_i \in \mathbb{R}^d\), the goal of k-means clustering is to partition the samples into \(K\) clusters, where \(K\) is a predefined number of clusters.

The algorithm works by iteratively assigning instances to the nearest cluster center and updating the cluster centers based on the mean of the instances in each cluster. The goal of k-means clustering is to minimize the sum of squared distances between instances and their respective cluster centers.

Pseudocode for k-means clustering:

  1. Initialize k cluster centers randomly or using a heuristic.
  2. Assign each instance to the nearest cluster center.
  3. Update the cluster centers based on the mean of the instances in each cluster.
  4. Repeat steps 2 and 3 until convergence.
  5. Return the final cluster assignments and cluster centers.
def k_means_clustering(X, K):
    assert X.shape[0] >= K, "The number of samples must be greater than the number of clusters"
    N, d = X.shape

    # Initialize K cluster centers randomly
    centers = X[np.random.choice(N, K, replace=False)]

    # Assign each instance to the nearest cluster center
    def distance(x, y):
        return np.linalg.norm(x - y)
    
    def assign_to_centers(X, centers):
        # return the index of the nearest cluster center
        return np.argmin(np.array([distance(x, center) for center in centers]))
    
    assignments = np.array([assign_to_centers(X, centers) for x in X])

    # Update the cluster centers based on the mean of the instances in each cluster
    def update_centers(X, assignments):
        return np.array([np.mean(X[assignments == i], axis=0) for i in range(K)])
    
    centers = update_centers(X, assignments)
    assert centers.shape == (K, d), "The number of cluster centers must be equal to the number of clusters"

    return centers, assignments

Compressed Vector

You are given a compressed vector and you need to compute the dot product of two compressed vectors.

class CompressedVector:
    def __init__(self, vector: list[int]):
        # Compress the input vector
        # no constraint in size and max, min value of input vector 
        self.vector = vector
        self.length = len(vector)
        self.check_valid_input()
        self.com_vec = self.assign()
    
    def check_valid_input(self):
        assert len(self.vector) >= 1, "size should not be 0"
    
    # data structure: [[value, num_repeat], ...]
    
    def assign(self):
        output = []
        
        prev_v = self.vector[0]
        count_of_v = 1
        
        if len(self.vector) == 1:
            return [(prev_v, count_of_v)]
        
        for iv, v in enumerate(self.vector[1:]):
            if v == prev_v:
                count_of_v += 1
            else:
                # insert to the output
                output.append((prev_v, count_of_v))
                # reset
                prev_v = v 
                count_of_v = 1

        output.append((prev_v, count_of_v))
       
        return output


def dot_product(v1: CompressedVector, v2: CompressedVector) -> int:
    # Compute dot product
    # args: v1 and v2 are compresed vector Class
    # 
    assert v1.length == v2.length
    result = 0
    
    idx1, idx2 = 0, 0  # Indices into compressed vectors
    count1, count2 = 0, 0  # Position within current run

    for _ in range(v1.length):
        # get current value
        val1, len1 = v1.com_vec[idx1]
        val2, len2 = v2.com_vec[idx2]

        # compute product
        result += val1 * val2

        # advance counters
        count1 += 1
        count2 += 1

        # advance indices
        if count1 == len1:
            idx1 += 1
            count1 = 0
        if count2 == len2:
            idx2 += 1
            count2 = 0

    return result

Uniform Batch Balancing

Implement a function that samples batches of indices of elements in the training data, maintaining a uniform distribution over each class in the batch. The implementation must have the following properties:

  1. Indices in batches are randomised.
  2. All batches have the same size equal to batch_size.
  3. No index can appear twice in the same batch.
  4. Classes appear in batches uniformly.
from typing import List, Iterator


def sample_uniform_batches(labels: List[str], batch_size: int) -> Iterator[List[int]]:
    """
    Sample random batches of training example indices while maintaining a uniform distribution over class frequencies in each batch. 

    The implementation must have the following properties:
        1. Indices in batches are randomised.
        2. All batches have the same size equal to `batch_size`.
        3. No index can appear twice in the same batch.
        4. Classes appear in batches uniformly. 
        
    Example:
    X, y
        >>> iterator = sample_uniform_batches(labels=['a', 'b', 'a'], batch_size=2)
        >>> next(iterator)
        [1, 0]
        >>> next(iterator)
        [1, 2]
        >>> next(iterator)
        [0, 1]

    Args:
        labels: Class labels of training examples
        batch_size: Number of indices in each batch

    Returns:
        An infinite iterator over batches
    """
    # YOUR_CODE_HERE
    
    class_to_indices = {}
    for i, label in enumerate(labels):
        if label not in class_to_indices:
            class_to_indices[label] = []
        class_to_indices[label].append(i)
    
    classes = list(class_to_indices.keys())

    
    while True:
        # shuffle the classes 
        random.shuffle(classes)

        # shuffle the indices of each class
        for class_ in classes:
            random.shuffle(class_to_indices[class_])

        batch = []
        size_of_class = {}
        while len(batch) < batch_size:
            for class_ in classes:
                if class_ not in size_of_class:
                    size_of_class[class_] = 0
                size_of_class[class_] += 1
                if size_of_class[class_] < len(class_to_indices[class_]):
                    batch.append(class_to_indices[class_][size_of_class[class_]])
                else:
                    continue

        yield batch[:batch_size]