2 분 소요

DeepWalk를 numpy 바탕으로 하드코딩하여 구현한 것을 파트별로 구분한 것입니다.

Random Walk

class DeepWalk:
    ...
    def random_walking(self, node: int):
        current_node = node
        random_walks = [current_node]
        link_dict = self.linked

        for t in range(self.wl - 1):
            candidates = link_dict[current_node]
            candidates = np.setdiff1d(candidates, np.array([node]))
            if candidates.size == 0:
                next_node = node
            else:
                next_node = np.random.choice(candidates, candidates.size, 1)[0]
            random_walks.append(next_node)
            current_node = next_node
        return random_walks
    ...

Softmax

class DeepWalk:
    ...
    def pure_update_function(self, opened_window, random_walks, v_j, w_mat, in_hi_vec, in_vec, target_node):
        w_p_mat = self.second_phi.copy()
        # Hidden to Output Vector; (34,1)
        hi_ou_vec = np.transpose(self.second_phi) @ in_hi_vec

        # Softmax; (34,1)
        exponential = np.exp(hi_ou_vec)
        total_sum = np.sum(exponential)
        y_j = exponential / total_sum

        # k loop : pairwise output loop
        for k in opened_window:
            context_node = random_walks[k]
            if context_node == v_j:
                continue
            u_j_star = np.zeros(self.nodes_num)
            u_j_star = u_j_star.reshape((self.nodes_num, 1))
            u_j_star[context_node] = 1

            # (34,1)
            t_j = u_j_star.reshape((self.nodes_num, 1))

            u_j_star = np.transpose(w_mat[context_node]).reshape((self.d, 1))
            u_j_star = np.transpose(w_p_mat) @ u_j_star

            # (34,1)
            e_j = y_j - t_j

            # (2,34)
            e_sp = in_hi_vec @ np.transpose(e_j)

            # (2,1)
            eh_i = self.second_phi @ e_j

            # (34,2)
            e_fp = in_vec @ np.transpose(eh_i)
            e_fp = e_fp[target_node]

            # Error Function Partial Derivatives
            self.phi[target_node] -= self.lr * e_fp
            self.second_phi -= self.lr * e_sp
            loss = np.log(total_sum) - u_j_star
            self.loss_per_iter.append(loss[context_node])
            self.calcul_num += 1
    ...

Hierarchical softmax

class DeepWalk:
    ...
    def hierarchical_update_function(self, opened_window, random_walks, v_j, inner_unit_mat, in_hi_vec, target_node):
        # k loop : pairwise output loop
        for k in opened_window:
            context_node = random_walks[k]
            channel = self.ch[context_node]
            if context_node == v_j:
                continue

            # H-softmax
            p_l_decimal = self.node_path_dict[context_node]

            # (2, paths)
            p_l = inner_unit_mat[:, p_l_decimal]

            # (paths, 1)
            path_l = self.sigmoid(channel*np.transpose(p_l) @ in_hi_vec)
            loss = -np.log(path_l)

            e_v_l_h = channel*(path_l-1)
            e_v_l_h = np.transpose(e_v_l_h)
            e_v_l_h = np.tile(e_v_l_h, (2, 1))

            eh_l = e_v_l_h * p_l
            e_v_l = e_v_l_h * in_hi_vec

            self.inner_unit_matrix[:, p_l_decimal] -= self.lr * e_v_l
            self.phi[target_node] -= self.lr * np.sum(eh_l, axis=1)
            self.loss_per_iter.append(np.sum(loss))
            self.calcul_num += 1

SkipGram

class DeepWalk:
    ...
    def skip_gram(self, random_walks: list):
        random_walks = np.array(random_walks)

        index_array = [wl for wl in range(self.wl)]

        padding = [np.nan] * self.ws
        index_array = np.array(padding + index_array + padding)

        # j loop : input loop
        for j, v_j in enumerate(random_walks):
            # Update연산과 Loss 연산이 서로 영향끼치지 않도록 설정한 것입니다.
            phi_mat = self.phi.copy()

            # One Hot Encoding; (34,1)
            target_node = v_j
            in_vec = np.zeros(self.nodes_num, dtype=np.int32).reshape((self.nodes_num, 1))
            in_vec[target_node] = 1

            # Input to Hidden Vector; (2,1)
            in_hi_vec = np.transpose(phi_mat[target_node]).reshape((self.d, 1))

            # window로 들여다 볼 범위 설정
            opened_window = index_array[j:j + self.ws * 2 + 1]
            opened_window = opened_window[~np.isnan(opened_window)]
            opened_window = opened_window.astype(dtype=np.int32)

            if self.method == 'Hierarchical':
                inner_unit_mat = self.inner_unit_matrix.copy()
                self.hierarchical_update_function(opened_window, random_walks, v_j,
                                                  inner_unit_mat, in_hi_vec, target_node)
            elif self.method == 'Pure':
                self.pure_update_function(opened_window, random_walks, v_j, phi_mat, in_hi_vec, in_vec, target_node)
    ...

댓글남기기