diff --git a/scripts/builtin/autoGradientCheck.dml b/scripts/builtin/autoGradientCheck.dml new file mode 100644 index 00000000000..eebcbba7600 --- /dev/null +++ b/scripts/builtin/autoGradientCheck.dml @@ -0,0 +1,92 @@ +# gradient_check.dml +# Usage: +# systemds gradient_check.dml -exec singlenode -nvargs X=... H1=16 H2=8 H3=4 LAYER=1 R=1 C=1 EPS=1e-4 + +source("scripts/builtin/autoencoder_2layer.dml") as ae + +X = read($X) +n = nrow(X) +m = ncol(X) + +# ---- hidden layers list (same convention as wrapper) +hidden_layers = list(as.integer($H1)) +if(as.integer($H2) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H2))) +if(as.integer($H3) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H3))) + +# ---- deterministic-ish permutation input +# If you want strict determinism, call setseed(...) before Rand in SystemDS. +order_rand = Rand(rows=n, cols=1, min=0, max=1, pdf="uniform") +permut = table(seq(1,n,1), + order(target=order_rand, by=1, index.return=TRUE), + n, n) + +X_pre = permut %*% X +means = colSums(X_pre)/n +stds = sqrt((colSums(X_pre^2)/n - means*means)*n/(n-1)) + 1e-17 +X_pre = (X_pre - means)/stds + +# ---- init model +layer_sizes = ae::build_layer_sizes(m, hidden_layers) +layer_count = length(layer_sizes) - 1 +[W_list0, b_list0, updW0, updb0] = ae::init_layers(layer_sizes) + +# ---- forward + backward (analytic grad) +[act0, primes0, Yhat0, E0] = ae::forward_pass_layers(X_pre, W_list0, b_list0, X_pre) +[grads_W, grads_b] = ae::backward_pass_layers(X_pre, act0, primes0, W_list0, E0) + +# grads_W is stored in reverse order: grads_W[1] corresponds to last layer +target_layer = as.integer($LAYER) # 1..layer_count (1=first layer) +idx = layer_count - target_layer + 1 +G = as.matrix(grads_W[idx]) + +r = as.integer($R) +c = as.integer($C) +eps = as.double($EPS) + +g_analytic = as.double(G[r,c]) + +# ---- helper: build W_list with one layer replaced +replace_layer = function(list[unknown] W_list, Integer k, Matrix[Double] Wnew) + return(list[unknown] W_out) +{ + W_out = list() + for(i in 1:length(W_list)) { + if(i == k) W_out = append(W_out, list(Wnew)) + else W_out = append(W_out, list(as.matrix(W_list[i]))) + } +} + +# ---- finite difference: perturb W[target_layer][r,c] by +/- eps +Wk = as.matrix(W_list0[target_layer]) +nr = nrow(Wk); nc = ncol(Wk) + +# sparse perturb matrix with eps at (r,c) +ri = matrix(as.double(r), rows=1, cols=1) +ci = matrix(as.double(c), rows=1, cols=1) +vi = matrix(eps, rows=1, cols=1) +D = table(ri, ci, vi, nr, nc) + +W_plus = Wk + D +W_minus = Wk - D + +W_list_plus = replace_layer(W_list0, target_layer, W_plus) +W_list_minus = replace_layer(W_list0, target_layer, W_minus) + +[_, _, _, E_plus] = ae::forward_pass_layers(X_pre, W_list_plus, b_list0, X_pre) +[_, _, _, E_minus] = ae::forward_pass_layers(X_pre, W_list_minus, b_list0, X_pre) + +obj_plus = ae::obj(E_plus) +obj_minus = ae::obj(E_minus) + +g_numeric = (obj_plus - obj_minus) / (2 * eps) + +# ---- report +den = max(1e-12, abs(g_numeric) + abs(g_analytic)) +rel_err = abs(g_numeric - g_analytic) / den + +print("GRADCHECK layer=" + target_layer + " r=" + r + " c=" + c) +print(" analytic = " + g_analytic) +print(" numeric = " + g_numeric) +print(" rel_err = " + rel_err) diff --git a/scripts/builtin/autoencoderGeneralized.dml b/scripts/builtin/autoencoderGeneralized.dml new file mode 100644 index 00000000000..1c84050d65b --- /dev/null +++ b/scripts/builtin/autoencoderGeneralized.dml @@ -0,0 +1,110 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +#------------------------------------------------------------- +# Apache SystemDS - autoencoder wrapper +# Prints stable BEFORE/AFTER objective lines for parsing +# and optionally writes outputs (skips if /dev/null). +#------------------------------------------------------------- +source("scripts/builtin/autoencoder_2layer.dml") as ae + +# --------- read inputs --------- +X = read($X) + +# Build encoder hidden layer list +hidden_layers = list(as.integer($H1)) +if(as.integer($H2) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H2))) +if(as.integer($H3) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H3))) + +# --------- compute objective BEFORE training --------- +# I compute it on a standardized + permuted copy, consistent with training. +# Create a fixed permutation seed via order_rand so both pre and training match. +n = nrow(X) +order_rand = Rand(rows=n, cols=1, min=0, max=1, pdf="uniform") +permut = table( + seq(1, n, 1), + order(target=order_rand, by=1, index.return=TRUE), + n, n +) +X_pre = permut %*% X + +means = colSums(X_pre) / n +stds = sqrt((colSums(X_pre^2)/n - means*means) * n/(n-1)) + 1e-17 +X_pre = (X_pre - means) / stds + +# Initialize a model once which same initializer used by training +layer_sizes = ae::build_layer_sizes(ncol(X_pre), hidden_layers) +[W_list0, b_list0, updW0, updb0] = ae::init_layers(layer_sizes) + +# Forward + objective +[act0, primes0, Yhat0, E0] = ae::forward_pass_layers(X_pre, W_list0, b_list0, X_pre) +obj_before = ae::obj(E0) +print("WRAPPER FULL OBJ BEFORE TRAIN = " + obj_before) + +# --------- train --------- +# IMPORTANT: pass order_rand so training uses same permutation as X_pre +[model, hidden] = ae::m_autoencoder( + X=X, + hidden_layers=hidden_layers, + max_epochs=as.integer($EPOCH), + full_obj=as.boolean($FULL_OBJ), + batch_size=as.integer($BATCH), + step=as.double($STEP), + decay=as.double($DECAY), + mu=as.double($MOMENTUM), + method=$METHOD, + mode=$MODE, + utype=$UTYPE, + freq=$FREQ, + k=as.integer($WORKERS), + scheme=$SCHEME, + nbatches=as.integer($NBATCHES), + modelAvg=as.boolean($MODELAVG), + order_rand=order_rand +) + +# --------- compute objective AFTER training on SAME X_pre --------- +encoder_layers = length(hidden_layers) +layer_count = 2 * encoder_layers + +W_list1 = list() +b_list1 = list() + +# model layout in generalized code path is: +# [W1..WL, b1..bL, updW1..updWL, updb1..updbL] +for(i in 1:layer_count) + W_list1 = append(W_list1, list(as.matrix(model[i]))) +for(i in 1:layer_count) + b_list1 = append(b_list1, list(as.matrix(model[layer_count + i]))) + +[act1, primes1, Yhat1, E1] = ae::forward_pass_layers(X_pre, W_list1, b_list1, X_pre) +obj_after = ae::obj(E1) +print("WRAPPER FULL OBJ AFTER TRAIN = " + obj_after) + +# --------- optional outputs skip if /dev/null --------- +W1_out = as.matrix(model[1]) +Wlast_out = as.matrix(model[layer_count]) +hidden_out = hidden + +if($W1_out != "/dev/null") write(W1_out, $W1_out) +if($Wlast_out != "/dev/null") write(Wlast_out, $Wlast_out) +if($hidden_out != "/dev/null") write(hidden_out, $hidden_out) diff --git a/scripts/builtin/autoencoder_2layer.dml b/scripts/builtin/autoencoder_2layer.dml index ae2d30e67a8..1993680ab9f 100644 --- a/scripts/builtin/autoencoder_2layer.dml +++ b/scripts/builtin/autoencoder_2layer.dml @@ -19,12 +19,14 @@ # #------------------------------------------------------------- +# ============================================================================= +# Autoencoder 2-layer + generalized with Parameter Server training # Trains a 2-layer autoencoder with minibatch SGD and step-size decay. # If invoked with H1 > H2 then it becomes a 'bowtie' structured autoencoder # Weights are initialized using Glorot & Bengio (2010) AISTATS initialization. # The script standardizes the input before training (can be turned off). # Also, it randomly reshuffles rows before training. -# Currently, tanh is set to be the activation function. +# Currently, tanh is set to be the activation function. # By re-implementing 'func' DML-bodied function, one can change the activation. # # INPUT: @@ -34,14 +36,22 @@ # num_hidden2 Number of neurons in the 2nd hidden layer # max_epochs Number of epochs to train for # full_obj If TRUE, Computes objective function value (squared-loss) -# at the end of each epoch. Note that, computing the full -# objective can take a lot of time. -# batch_size Mini-batch size (training parameter) -# step Initial step size (training parameter) -# decay Decays step size after each epoch (training parameter) -# mu Momentum parameter (training parameter) +# at the end of each epoch. Note that, computing the full +# objective can take a lot of time. +# batch_size Mini-batch size training parameter +# step Initial step size training parameter +# decay Decays step size after each epoch training parameter +# mu Momentum parameter training parameter +# method Training method: DEFAULTSERVER or PARAMSERVER +# mode Paramserv mode: LOCAL or DISTRIBUTED (REMOTE_SPARK) # TODO: distributed needs to be implemented +# utype Paramserv update type (e.g., BSP, ASP) #TODO: Add support for ASP +# freq Paramserv update frequency (e.g., BATCH, EPOCH) +# k Number of workers +# scheme Data partitioning scheme for paramserv +# nbatches Optional number of batches per epoch for paramserv +# modelAvg Optional boolean parameter to select between updating or averaging the model in paramserver side # W1_rand Weights might be initialized via input matrices -# W2_rand --- +# W2_rand --- # W3_rand --- # W4_rand --- # --------------------------------------------------------------------------------------------- @@ -59,10 +69,67 @@ # HIDDEN Matrix storing the hidden (2nd) layer representation if needed # ---------------------------------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- +# Generalized autoencoder entry point (arbitrary encoder depth) +# ----------------------------------------------------------------------------- +m_autoencoder = function(Matrix[Double] X, list[unknown] hidden_layers, Integer max_epochs, + Boolean full_obj = FALSE, Integer batch_size = 256, Double step = 1e-5, Double decay = 0.95, Double mu = 0.9, + String method = "DEFAULTSERVER", String mode = "LOCAL", String utype = "BSP", String freq = "BATCH", + Integer k = 1, String scheme = "DISJOINT_RANDOM", Integer nbatches = 0, Boolean modelAvg = FALSE, + Matrix[Double] order_rand = matrix(0, rows=0, cols=0)) + return(list[unknown] model, Matrix[Double] reordered_H) +{ + # hidden_layers defines encoder sizes e.g., list(128, 64). + # Decoder mirrors encoder automatically. + n = nrow(X) + m = ncol(X) + #randomly reordering rows + if(nrow(order_rand) == 0 & ncol(order_rand) == 0) + permut = table(seq(1,n,1), order(target=Rand(rows=n, cols=1, min=0, max=1, pdf="uniform"), by=1, index.return=TRUE), n, n) + else + permut = table(seq(1,n,1), order(target=order_rand, by=1, index.return=TRUE), n, n) + X = permut %*% X + + #z-transform, whitening operator is better + means = colSums(X)/n + stds = sqrt((colSums(X^2)/n - means*means)*n/(n-1)) + 1e-17 + X = (X - means)/stds + + # Build full layer sizes including mirrored decoder and output layer. + layer_sizes = build_layer_sizes(m, hidden_layers) + layer_count = length(layer_sizes) - 1 + # Initialize weights, biases, and momentum buffers. + [W_list, b_list, upd_W_list, upd_b_list] = init_layers(layer_sizes) + + if(method == "DEFAULTSERVER") { + # Single-process training loop with full control over epochs/batches. + [W_list, b_list, upd_W_list, upd_b_list, reordered_H] = train_default_layers( + X, permut, W_list, b_list, upd_W_list, upd_b_list, + hidden_layers, max_epochs, full_obj, batch_size, step, decay, mu) + } + else if(method == "PARAMSERVER") { + # Parameter server training via paramserv builtin. + [W_list, b_list, upd_W_list, upd_b_list, reordered_H] = train_paramserv_layers( + X, permut, W_list, b_list, upd_W_list, upd_b_list, hidden_layers, layer_count, + max_epochs, batch_size, step, mu, mode, utype, freq, k, scheme, nbatches, modelAvg) + } + else { + stop("Unsupported method: " + method + ". Use DEFAULTSERVER or PARAMSERVER.") + } + + # Return flattened model list for compatibility with paramserv conventions. + model = flatten_model(W_list, b_list, upd_W_list, upd_b_list) +} + +# ----------------------------------------------------------------------------- +# Legacy 2-layer autoencoder entry point +# ----------------------------------------------------------------------------- m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer num_hidden2, Integer max_epochs, Boolean full_obj = FALSE, Integer batch_size = 256, Double step = 1e-5, Double decay = 0.95, Double mu = 0.9, - Matrix[Double] W1_rand = matrix(0, rows=0, cols=0), Matrix[Double] W2_rand = matrix(0, rows=0, cols=0), - Matrix[Double] W3_rand = matrix(0, rows=0, cols=0), Matrix[Double] W4_rand = matrix(0, rows=0, cols=0), + String method = "DEFAULTSERVER", String mode = "LOCAL", String utype = "BSP", String freq = "BATCH", + Integer k = 1, String scheme = "DISJOINT_RANDOM", Integer nbatches = 0, Boolean modelAvg = FALSE, + Matrix[Double] W1_rand = matrix(0, rows=0, cols=0), Matrix[Double] W2_rand = matrix(0, rows=0, cols=0), + Matrix[Double] W3_rand = matrix(0, rows=0, cols=0), Matrix[Double] W4_rand = matrix(0, rows=0, cols=0), Matrix[Double] order_rand = matrix(0, rows=0, cols=0)) return(Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, Matrix[Double] reordered_H) @@ -70,9 +137,9 @@ m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer n n = nrow(X) m = ncol(X) #randomly reordering rows - if(nrow(order_rand) == 0 & ncol(order_rand) == 0) + if(nrow(order_rand) == 0 & ncol(order_rand) == 0) permut = table(seq(1,n,1), order(target=Rand(rows=n, cols=1, min=0, max=1, pdf="uniform"), by=1, index.return=TRUE), n, n) - else + else permut = table(seq(1,n,1), order(target=order_rand, by=1, index.return=TRUE), n, n) X = permut %*% X @@ -80,25 +147,144 @@ m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer n means = colSums(X)/n stds = sqrt((colSums(X^2)/n - means*means)*n/(n-1)) + 1e-17 X = (X - means)/stds - - if(nrow(W1_rand) == 0 & ncol(W1_rand) == 0) + + if(nrow(W1_rand) == 0 & ncol(W1_rand) == 0) W1_rand = Rand(rows=num_hidden1, cols=m, min=-1, max=1, pdf="uniform") - if(nrow(W2_rand) == 0 & ncol(W2_rand) == 0) + if(nrow(W2_rand) == 0 & ncol(W2_rand) == 0) W2_rand = Rand(rows=num_hidden2, cols=num_hidden1, min=-1, max=1, pdf="uniform") - if(nrow(W3_rand) == 0 & ncol(W3_rand) == 0) + if(nrow(W3_rand) == 0 & ncol(W3_rand) == 0) W3_rand = Rand(rows=num_hidden1, cols=num_hidden2, min=-1, max=1, pdf="uniform") - if(nrow(W4_rand) == 0 & ncol(W4_rand) == 0) + if(nrow(W4_rand) == 0 & ncol(W4_rand) == 0) W4_rand = Rand(rows=m, cols=num_hidden1, min=-1, max=1, pdf="uniform") - W1 = sqrt(6)/sqrt(m + num_hidden1) * W1_rand + W1 = sqrt(6)/sqrt(m + num_hidden1) * W1_rand b1 = matrix(0, rows=num_hidden1, cols=1) - W2 = sqrt(6)/sqrt(num_hidden1 + num_hidden2) * W2_rand + W2 = sqrt(6)/sqrt(num_hidden1 + num_hidden2) * W2_rand b2 = matrix(0, rows=num_hidden2, cols=1) - W3 = sqrt(6)/sqrt(num_hidden2 + num_hidden1) * W3_rand + W3 = sqrt(6)/sqrt(num_hidden2 + num_hidden1) * W3_rand b3 = matrix(0, rows=num_hidden1, cols=1) - W4 = sqrt(6)/sqrt(num_hidden2 + m) * W4_rand + W4 = sqrt(6)/sqrt(num_hidden1 + m) * W4_rand b4 = matrix(0, rows=m, cols=1) + if(method == "DEFAULTSERVER") { + [W1, b1, W2, b2, W3, b3, W4, b4, reordered_H] = train_default( + X, permut, W1, b1, W2, b2, W3, b3, W4, b4, + max_epochs, full_obj, batch_size, step, decay, mu) + } + else if(method == "PARAMSERVER") { + [W1, b1, W2, b2, W3, b3, W4, b4, reordered_H] = train_paramserv( + X, permut, W1, b1, W2, b2, W3, b3, W4, b4, + max_epochs, batch_size, step, mu, mode, utype, freq, k, scheme, nbatches, modelAvg) + } + else { + stop("Unsupported method: " + method + ". Use DEFAULTSERVER or PARAMSERVER.") + } +} + +# ----------------------------------------------------------------------------- +# Activation function and default: tanh +# ----------------------------------------------------------------------------- +# To use another activation, implement a DML-bodied function named 'func' +# and comment out this one. +func = function(Matrix[Double] X) return(Matrix[Double] Y, Matrix[Double] Y_prime){ + Y = (exp(2*X) - 1)/(exp(2*X) + 1) + Y_prime = 1 - Y^2 +} + +# ----------------------------------------------------------------------------- +# Fixed 2-layer helpers +# ----------------------------------------------------------------------------- +forward_pass = function(Matrix[Double] X, Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, Matrix[Double] Y) + return(Matrix[Double] H1, Matrix[Double] H1_prime,Matrix[Double] H2, Matrix[Double] H2_prime, + Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat, Matrix[Double] Yhat_prime, Matrix[Double] E) +{ + H1_in = t(W1 %*% t(X) + b1) + [H1, H1_prime] = func(H1_in) + + H2_in = t(W2 %*% t(H1) + b2) + [H2, H2_prime] = func(H2_in) + + H3_in = t(W3 %*% t(H2) + b3) + [H3, H3_prime] = func(H3_in) + + Yhat_in = t(W4 %*% t(H3) + b4) + [Yhat, Yhat_prime] = func(Yhat_in) + E = Yhat - Y +} + +backward_pass = function(Matrix[Double] X, Matrix[Double] H1, Matrix[Double] H1_prime, Matrix[Double] H2, Matrix[Double] H2_prime, + Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat_prime, Matrix[Double] E, + Matrix[Double] W1, Matrix[Double] W2, Matrix[Double] W3, Matrix[Double] W4) + return(Matrix[Double] W1_grad, Matrix[Double] b1_grad,Matrix[Double] W2_grad, Matrix[Double] b2_grad, + Matrix[Double] W3_grad, Matrix[Double] b3_grad, Matrix[Double] W4_grad, Matrix[Double] b4_grad) +{ + #backprop + delta4 = E * Yhat_prime + delta3 = H3_prime * (delta4 %*% W4) + delta2 = H2_prime * (delta3 %*% W3) + delta1 = H1_prime * (delta2 %*% W2) + + #compute gradients + b4_grad = t(colSums(delta4)) + b3_grad = t(colSums(delta3)) + b2_grad = t(colSums(delta2)) + b1_grad = t(colSums(delta1)) + + W4_grad = t(delta4) %*% H3 + W3_grad = t(delta3) %*% H2 + W2_grad = t(delta2) %*% H1 + W1_grad = t(delta1) %*% X +} + +obj = function(Matrix[Double] E) return(Double val){ + val = 0.5 * sum(E^2) +} + +# ----------------------------------------------------------------------------- +# Fixed 2-layer training = DEFAULTSERVER + PARAMSERVER +# ----------------------------------------------------------------------------- +apply_update = function(Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Matrix[Double] upd_W1, Matrix[Double] upd_b1, Matrix[Double] upd_W2, Matrix[Double] upd_b2, + Matrix[Double] upd_W3, Matrix[Double] upd_b3, Matrix[Double] upd_W4, Matrix[Double] upd_b4, + Matrix[Double] W1_grad, Matrix[Double] b1_grad, Matrix[Double] W2_grad, Matrix[Double] b2_grad, + Matrix[Double] W3_grad, Matrix[Double] b3_grad, Matrix[Double] W4_grad, Matrix[Double] b4_grad, + Double step, Double mu, Double batch_rows) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] upd_W1_out, Matrix[Double] upd_b1_out, Matrix[Double] upd_W2_out, Matrix[Double] upd_b2_out, + Matrix[Double] upd_W3_out, Matrix[Double] upd_b3_out, Matrix[Double] upd_W4_out, Matrix[Double] upd_b4_out) +{ + local_step = step / batch_rows + upd_W1_out = mu * upd_W1 - local_step * W1_grad + upd_b1_out = mu * upd_b1 - local_step * b1_grad + upd_W2_out = mu * upd_W2 - local_step * W2_grad + upd_b2_out = mu * upd_b2 - local_step * b2_grad + upd_W3_out = mu * upd_W3 - local_step * W3_grad + upd_b3_out = mu * upd_b3 - local_step * b3_grad + upd_W4_out = mu * upd_W4 - local_step * W4_grad + upd_b4_out = mu * upd_b4 - local_step * b4_grad + + W1_out = W1 + upd_W1_out + b1_out = b1 + upd_b1_out + W2_out = W2 + upd_W2_out + b2_out = b2 + upd_b2_out + W3_out = W3 + upd_W3_out + b3_out = b3 + upd_b3_out + W4_out = W4 + upd_W4_out + b4_out = b4 + upd_b4_out +} + +train_default = function(Matrix[Double] X, Matrix[Double] permut, + Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Integer max_epochs, Boolean full_obj, Integer batch_size, Double step, Double decay, Double mu) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] reordered_H) +{ + n = nrow(X) upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) upd_b1 = matrix(0, rows=nrow(b1), cols=ncol(b1)) upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) @@ -109,8 +295,8 @@ m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer n upd_b4 = matrix(0, rows=nrow(b4), cols=ncol(b4)) if( full_obj ){ - [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, - full_Yhat_prime, full_E] = feedForward(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) full_o = obj(full_E) print("EPOCHS=" + 0 + " OBJ (FULL DATA): " + full_o) } @@ -125,32 +311,20 @@ m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer n if(end > n) end = n X_batch = X[beg:end,] - [H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat, Yhat_prime, E] - = feedForward(X_batch, W1, b1, W2, b2, W3, b3, W4, b4, X_batch) - [W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad] - = grad(X_batch, H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat_prime, E, W1, W2, W3, W4) + [H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat, Yhat_prime, E] + = forward_pass(X_batch, W1, b1, W2, b2, W3, b3, W4, b4, X_batch) + [W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad] + = backward_pass(X_batch, H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat_prime, E, W1, W2, W3, W4) o = obj(E) - print("epochs=%5.4f BATCH beg=%d end=%d obj=%f", (iter / num_iters_per_epoch), beg, end, o) - - #update - local_step = step / nrow(X_batch) - upd_W1 = mu * upd_W1 - local_step * W1_grad - upd_b1 = mu * upd_b1 - local_step * b1 - upd_W2 = mu * upd_W2 - local_step * W2_grad - upd_b2 = mu * upd_b2 - local_step * b2 - upd_W3 = mu * upd_W3 - local_step * W3_grad - upd_b3 = mu * upd_b3 - local_step * b3 - upd_W4 = mu * upd_W4 - local_step * W4_grad - upd_b4 = mu * upd_b4 - local_step * b4 - W1 = W1 + upd_W1 - b1 = b1 + upd_b1 - W2 = W2 + upd_W2 - b2 = b2 + upd_b2 - W3 = W3 + upd_W3 - b3 = b3 + upd_b3 - W4 = W4 + upd_W4 - b4 = b4 + upd_b4 + print("epochs=" + (iter / num_iters_per_epoch) + " BATCH beg=" + beg + " end=" + end + " obj=" + o) + + [W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4] = apply_update( + W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4, + W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, + step, mu, nrow(X_batch)) iter = iter + 1 if(end == n) beg = 1 @@ -159,70 +333,534 @@ m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer n if( iter %% num_iters_per_epoch == 0 ) step = step * decay if( full_obj & iter %% num_iters_per_epoch == 0 ){ - [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, - full_Yhat_prime, full_E] = feedForward(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) full_o = obj(full_E) epochs = iter %/% num_iters_per_epoch print("EPOCHS=" + epochs + " iter=" + iter + " OBJ (FULL DATA)=" + full_o) } } - [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, - full_Yhat_prime, full_E] = feedForward(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) reordered_H = t(permut) %*% full_H2 + + W1_out = W1 + b1_out = b1 + W2_out = W2 + b2_out = b2 + W3_out = W3 + b3_out = b3 + W4_out = W4 + b4_out = b4 } -#implements tanh -#to use another activation fn, implement a DML-bodied function with -#function name 'func' and comment out this one -func = function(Matrix[Double] X) return(Matrix[Double] Y, Matrix[Double] Y_prime){ - Y = (exp(2*X) - 1)/(exp(2*X) + 1) - Y_prime = 1 - Y^2 +train_paramserv = function(Matrix[Double] X, Matrix[Double] permut, + Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Integer max_epochs, Integer batch_size, Double step, Double mu, String mode, + String utype, String freq, Integer k, String scheme, Integer nbatches, Boolean modelAvg) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] reordered_H) +{ + upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) + upd_b1 = matrix(0, rows=nrow(b1), cols=ncol(b1)) + upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) + upd_b2 = matrix(0, rows=nrow(b2), cols=ncol(b2)) + upd_W3 = matrix(0, rows=nrow(W3), cols=ncol(W3)) + upd_b3 = matrix(0, rows=nrow(b3), cols=ncol(b3)) + upd_W4 = matrix(0, rows=nrow(W4), cols=ncol(W4)) + upd_b4 = matrix(0, rows=nrow(b4), cols=ncol(b4)) + + modelList = list(W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4) + params = list(step=step, mu=mu) + + ps_mode = mode + if(mode == "DISTRIBUTED") + ps_mode = "REMOTE_SPARK" + + if(nbatches > 0) { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", + nbatches=nbatches, modelAvg=modelAvg) + } + else { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", modelAvg=modelAvg) + } + + W1 = as.matrix(modelList2[1]) + b1 = as.matrix(modelList2[2]) + W2 = as.matrix(modelList2[3]) + b2 = as.matrix(modelList2[4]) + W3 = as.matrix(modelList2[5]) + b3 = as.matrix(modelList2[6]) + W4 = as.matrix(modelList2[7]) + b4 = as.matrix(modelList2[8]) + + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + reordered_H = t(permut) %*% full_H2 + + W1_out = W1 + b1_out = b1 + W2_out = W2 + b2_out = b2 + W3_out = W3 + b3_out = b3 + W4_out = W4 + b4_out = b4 } -feedForward = function(Matrix[Double] X, Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, - Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, Matrix[Double] Y) - return(Matrix[Double] H1, Matrix[Double] H1_prime,Matrix[Double] H2, Matrix[Double] H2_prime, - Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat, Matrix[Double] Yhat_prime, Matrix[Double] E) +gradients = function(list[unknown] model, + list[unknown] hyperparams, + matrix[double] features, + matrix[double] labels) + return(list[unknown] gradients) { - H1_in = t(W1 %*% t(X) + b1) - [H1, H1_prime] = func(H1_in) + W1 = as.matrix(model[1]) + b1 = as.matrix(model[2]) + W2 = as.matrix(model[3]) + b2 = as.matrix(model[4]) + W3 = as.matrix(model[5]) + b3 = as.matrix(model[6]) + W4 = as.matrix(model[7]) + b4 = as.matrix(model[8]) - H2_in = t(W2 %*% t(H1) + b2) - [H2, H2_prime] = func(H2_in) + [H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat, Yhat_prime, E] + = forward_pass(features, W1, b1, W2, b2, W3, b3, W4, b4, labels) + [W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad] + = backward_pass(features, H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat_prime, E, W1, W2, W3, W4) - H3_in = t(W3 %*% t(H2) + b3) - [H3, H3_prime] = func(H3_in) + gradients = list(W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, nrow(features)) +} - Yhat_in = t(W4 %*% t(H3) + b4) - [Yhat, Yhat_prime] = func(Yhat_in) +aggregation = function(list[unknown] model, + list[unknown] hyperparams, + list[unknown] gradients) + return(list[unknown] modelResult) +{ + W1 = as.matrix(model[1]) + b1 = as.matrix(model[2]) + W2 = as.matrix(model[3]) + b2 = as.matrix(model[4]) + W3 = as.matrix(model[5]) + b3 = as.matrix(model[6]) + W4 = as.matrix(model[7]) + b4 = as.matrix(model[8]) + upd_W1 = as.matrix(model[9]) + upd_b1 = as.matrix(model[10]) + upd_W2 = as.matrix(model[11]) + upd_b2 = as.matrix(model[12]) + upd_W3 = as.matrix(model[13]) + upd_b3 = as.matrix(model[14]) + upd_W4 = as.matrix(model[15]) + upd_b4 = as.matrix(model[16]) + + W1_grad = as.matrix(gradients[1]) + b1_grad = as.matrix(gradients[2]) + W2_grad = as.matrix(gradients[3]) + b2_grad = as.matrix(gradients[4]) + W3_grad = as.matrix(gradients[5]) + b3_grad = as.matrix(gradients[6]) + W4_grad = as.matrix(gradients[7]) + b4_grad = as.matrix(gradients[8]) + batch_rows = as.double(as.scalar(gradients[9])) + step = as.double(as.scalar(hyperparams["step"])) + mu = as.double(as.scalar(hyperparams["mu"])) + + [W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4] = apply_update( + W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4, + W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, + step, mu, batch_rows) + + modelResult = list(W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4) +} + +# ----------------------------------------------------------------------------- +# Generalized autoencoder helpers to support variable depth +# ----------------------------------------------------------------------------- +build_layer_sizes = function(Integer input_dim, list[unknown] hidden_layers) + return(list[unknown] sizes) +{ + # Build encoder/decoder layer sizes as: + # [input] + hidden_layers + reverse(hidden_layers) + [input] + sizes = list(input_dim) + for(i in 1:length(hidden_layers)) { + sizes = append(sizes, list(as.integer(as.scalar(hidden_layers[i])))) + } + if(length(hidden_layers) > 1) { + for(i in (length(hidden_layers)-1):1) { + sizes = append(sizes, list(as.integer(as.scalar(hidden_layers[i])))) + } + } + sizes = append(sizes, list(input_dim)) +} + +init_layers = function(list[unknown] sizes) + return(list[unknown] W_list, list[unknown] b_list, list[unknown] upd_W_list, list[unknown] upd_b_list) +{ + # Initialize weights with Glorot scaling and zero biases/momentum buffers. + W_list = list() + b_list = list() + upd_W_list = list() + upd_b_list = list() + for(i in 1:(length(sizes)-1)) { + in_dim = as.integer(as.scalar(sizes[i])) + out_dim = as.integer(as.scalar(sizes[i+1])) + W = Rand(rows=out_dim, cols=in_dim, min=-1, max=1, pdf="uniform") + W = sqrt(6)/sqrt(in_dim + out_dim) * W + b = matrix(0, rows=out_dim, cols=1) + W_list = append(W_list, list(W)) + b_list = append(b_list, list(b)) + upd_W_list = append(upd_W_list, list(matrix(0, rows=out_dim, cols=in_dim))) + upd_b_list = append(upd_b_list, list(matrix(0, rows=out_dim, cols=1))) + } +} + +forward_pass_layers = function(Matrix[Double] X, list[unknown] W_list, list[unknown] b_list, Matrix[Double] Y) + return(list[unknown] activations, list[unknown] primes, Matrix[Double] Yhat, Matrix[Double] E) +{ + activations = list() + primes = list() + + A_prev = X + for(i in 1:length(W_list)) { + W = as.matrix(W_list[i]) + b = as.matrix(b_list[i]) + + Z = t(W %*% t(A_prev) + b) + [A, A_prime] = func(Z) + + activations = append(activations, list(A)) + primes = append(primes, list(A_prime)) + + A_prev = A + } + + Yhat = A_prev E = Yhat - Y } -grad = function(Matrix[Double] X, Matrix[Double] H1, Matrix[Double] H1_prime, Matrix[Double] H2, Matrix[Double] H2_prime, - Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat_prime, Matrix[Double] E, - Matrix[Double] W1, Matrix[Double] W2, Matrix[Double] W3, Matrix[Double] W4) - return(Matrix[Double] W1_grad, Matrix[Double] b1_grad,Matrix[Double] W2_grad, Matrix[Double] b2_grad, - Matrix[Double] W3_grad, Matrix[Double] b3_grad, Matrix[Double] W4_grad, Matrix[Double] b4_grad) + +backward_pass_layers = function(Matrix[Double] X, list[unknown] activations, list[unknown] primes, + list[unknown] W_list, Matrix[Double] E) + return(list[unknown] grads_W, list[unknown] grads_b) { - #backprop - delta4 = E * Yhat_prime - delta3 = H3_prime * (delta4 %*% W4) - delta2 = H2_prime * (delta3 %*% W3) - delta1 = H1_prime * (delta2 %*% W2) + L = length(W_list) + grads_W = list() + grads_b = list() - #compute gradients - b4_grad = t(colSums(delta4)) - b3_grad = t(colSums(delta3)) - b2_grad = t(colSums(delta2)) - b1_grad = t(colSums(delta1)) + delta = E * as.matrix(primes[L]) - W4_grad = t(delta4) %*% H3 - W3_grad = t(delta3) %*% H2 - W2_grad = t(delta2) %*% H1 - W1_grad = t(delta1) %*% X + for(i in L:1) { + A_prev = X + if(i > 1) + A_prev = as.matrix(activations[i-1]) + + grad_W = t(delta) %*% A_prev + grad_b = t(colSums(delta)) + + grads_W = append(grads_W, list(grad_W)) + grads_b = append(grads_b, list(grad_b)) + + if(i > 1) { + W_curr = as.matrix(W_list[i]) + delta = as.matrix(primes[i-1]) * (delta %*% W_curr) + } + } } -obj = function(Matrix[Double] E) return(Double val){ - val = 0.5 * sum(E^2) + +apply_update_layers = function(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] grads_W, list[unknown] grads_b, + Double step, Double mu, Double batch_rows) + return(list[unknown] W_out, list[unknown] b_out, list[unknown] upd_W_out, list[unknown] upd_b_out) +{ + # Apply SGD with momentum updates across all layers. + W_out = list() + b_out = list() + upd_W_out = list() + upd_b_out = list() + local_step = step / batch_rows + for(i in 1:length(W_list)) { + W = as.matrix(W_list[i]) + b = as.matrix(b_list[i]) + upd_W = as.matrix(upd_W_list[i]) + upd_b = as.matrix(upd_b_list[i]) + idx = length(W_list) - i + 1 + dW = as.matrix(grads_W[idx]) + db = as.matrix(grads_b[idx]) + + upd_W = mu * upd_W - local_step * dW + upd_b = mu * upd_b - local_step * db + W = W + upd_W + b = b + upd_b + + W_out = append(W_out, list(W)) + b_out = append(b_out, list(b)) + upd_W_out = append(upd_W_out, list(upd_W)) + upd_b_out = append(upd_b_out, list(upd_b)) + } +} + +train_default_layers = function(Matrix[Double] X, Matrix[Double] permut, + list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] hidden_layers, Integer max_epochs, Boolean full_obj, + Integer batch_size, Double step, Double decay, Double mu) + return(list[unknown] W_out, list[unknown] b_out, + list[unknown] upd_W_out, list[unknown] upd_b_out, + Matrix[Double] reordered_H) +{ + # Default single-process training loop for variable-depth autoencoder. + n = nrow(X) + encoder_layers = length(hidden_layers) + if( full_obj ){ + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + full_o = obj(full_E) + print("EPOCHS=" + 0 + " OBJ (FULL DATA): " + full_o) + } + + iter = 0 + num_iters_per_epoch = ceil(n / batch_size) + max_iterations = max_epochs * num_iters_per_epoch + beg = 1 + while( iter < max_iterations ){ + end = beg + batch_size - 1 + if(end > n) end = n + X_batch = X[beg:end,] + + [activations, primes, Yhat, E] = forward_pass_layers(X_batch, W_list, b_list, X_batch) + [grads_W, grads_b] = backward_pass_layers(X_batch, activations, primes, W_list, E) + + o = obj(E) + print("epochs=" + (iter / num_iters_per_epoch) + " BATCH beg=" + beg + " end=" + end + " obj=" + o) + + [W_list, b_list, upd_W_list, upd_b_list] = apply_update_layers( + W_list, b_list, upd_W_list, upd_b_list, grads_W, grads_b, step, mu, nrow(X_batch)) + + iter = iter + 1 + if(end == n) beg = 1 + else beg = end + 1 + + if( iter %% num_iters_per_epoch == 0 ) step = step * decay + + if( full_obj & iter %% num_iters_per_epoch == 0 ){ + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + full_o = obj(full_E) + epochs = iter %/% num_iters_per_epoch + print("EPOCHS=" + epochs + " iter=" + iter + " OBJ (FULL DATA)=" + full_o) + } + } + + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + # Return the encoder bottleneck representation. + hidden = as.matrix(full_act[encoder_layers]) + reordered_H = t(permut) %*% hidden + + W_out = W_list + b_out = b_list + upd_W_out = upd_W_list + upd_b_out = upd_b_list } + +flatten_model = function(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list) + return(list[unknown] modelList) +{ + modelList = list() + + for(i in 1:length(W_list)) + modelList = append(modelList, list(as.matrix(W_list[i]))) + + for(i in 1:length(b_list)) + modelList = append(modelList, list(as.matrix(b_list[i]))) + + for(i in 1:length(upd_W_list)) + modelList = append(modelList, list(as.matrix(upd_W_list[i]))) + + for(i in 1:length(upd_b_list)) + modelList = append(modelList, list(as.matrix(upd_b_list[i]))) +} + +unflatten_model = function(list[unknown] modelList, Integer layer_count) + return(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list) +{ + W_list = list() + b_list = list() + upd_W_list = list() + upd_b_list = list() + + for(i in 1:layer_count) + W_list = append(W_list, list(as.matrix(modelList[i]))) + + for(i in 1:layer_count) + b_list = append(b_list, list(as.matrix(modelList[layer_count + i]))) + + for(i in 1:layer_count) + upd_W_list = append(upd_W_list, list(as.matrix(modelList[2*layer_count + i]))) + + for(i in 1:layer_count) + upd_b_list = append(upd_b_list, list(as.matrix(modelList[3*layer_count + i]))) +} + +train_paramserv_layers = function(Matrix[Double] X, Matrix[Double] permut, + list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] hidden_layers, Integer layer_count, Integer max_epochs, + Integer batch_size, Double step, Double mu, String mode, + String utype, String freq, Integer k, String scheme, Integer nbatches, Boolean modelAvg) + return(list[unknown] W_out, list[unknown] b_out, + list[unknown] upd_W_out, list[unknown] upd_b_out, + Matrix[Double] reordered_H) +{ + modelList = list() + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(W_list[i]))) + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(b_list[i]))) + + if(length(upd_W_list) == 0) { + for(i in 1:layer_count) + upd_W_list = append(upd_W_list, + list(matrix(0, rows=nrow(as.matrix(W_list[i])), cols=ncol(as.matrix(W_list[i]))))) + } + if(length(upd_b_list) == 0) { + for(i in 1:layer_count) + upd_b_list = append(upd_b_list, + list(matrix(0, rows=nrow(as.matrix(b_list[i])), cols=ncol(as.matrix(b_list[i]))))) + } + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(upd_W_list[i]))) + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(upd_b_list[i]))) + + params = list(step=step, mu=mu, layer_count=layer_count, batch_size=batch_size) + encoder_layers = length(hidden_layers) + + ps_mode = mode + if(mode == "DISTRIBUTED") + ps_mode = "REMOTE_SPARK" + + if(nbatches > 0) { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients_layers", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation_layers", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", + nbatches=nbatches, modelAvg=modelAvg) + } + else { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients_layers", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation_layers", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", modelAvg=modelAvg) + } + + [W_list, b_list, upd_W_list, upd_b_list] = unflatten_model(modelList2, layer_count) + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + hidden = as.matrix(full_act[encoder_layers]) + reordered_H = t(permut) %*% hidden + + W_out = W_list + b_out = b_list + upd_W_out = upd_W_list + upd_b_out = upd_b_list +} + + +gradients_layers = function(list[unknown] model, + list[unknown] hyperparams, + matrix[double] features, + matrix[double] labels) + return(list[unknown] gradients) +{ + layer_count = as.integer(as.scalar(hyperparams["layer_count"])) + + W_list = list() + b_list = list() + + for(i in 1:layer_count) + W_list = append(W_list, list(as.matrix(model[i]))) + + for(i in 1:layer_count) + b_list = append(b_list, list(as.matrix(model[layer_count + i]))) + + [activations, primes, Yhat, E] = forward_pass_layers(features, W_list, b_list, labels) + [grads_W, grads_b] = backward_pass_layers(features, activations, primes, W_list, E) + + gradients = list() + for(i in 1:layer_count) + gradients = append(gradients, list(as.matrix(grads_W[layer_count - i + 1]))) + for(i in 1:layer_count) + gradients = append(gradients, list(as.matrix(grads_b[layer_count - i + 1]))) +} + + +aggregation_layers = function(list[unknown] model, + list[unknown] hyperparams, + list[unknown] gradients) + return(list[unknown] modelResult) +{ + layer_count = as.integer(as.scalar(hyperparams["layer_count"])) + step = as.double(as.scalar(hyperparams["step"])) + mu = as.double(as.scalar(hyperparams["mu"])) + batch_rows = as.double(as.scalar(hyperparams["batch_size"])) + local_step = step / batch_rows + + W_out = list() + b_out = list() + upd_W_out = list() + upd_b_out = list() + + for(i in 1:layer_count) { + W = as.matrix(model[i]) + b = as.matrix(model[layer_count + i]) + upd_W = as.matrix(model[2*layer_count + i]) + upd_b = as.matrix(model[3*layer_count + i]) + + W_grad = as.matrix(gradients[i]) + b_grad = as.matrix(gradients[layer_count + i]) + + upd_W = mu * upd_W - local_step * W_grad + upd_b = mu * upd_b - local_step * b_grad + + W = W + upd_W + b = b + upd_b + + W_out = append(W_out, list(W)) + b_out = append(b_out, list(b)) + upd_W_out = append(upd_W_out, list(upd_W)) + upd_b_out = append(upd_b_out, list(upd_b)) + } + + modelResult = list() + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(W_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(b_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(upd_W_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(upd_b_out[i]))) +} + diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java index e79e48433f2..a6f84e84dd5 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java @@ -63,12 +63,16 @@ public synchronized void addFunctionProgramBlock(String fkey, FunctionProgramBlo public synchronized void addFunctionProgramBlock(String namespace, String fname, FunctionProgramBlock fpb) { addFunctionProgramBlock(namespace, fname, fpb, true); + } public synchronized void addFunctionProgramBlock(String namespace, String fname, FunctionProgramBlock fpb, boolean opt) { if( fpb == null ) throw new DMLRuntimeException("Invalid null function program block."); - namespace = getSafeNamespace(namespace); + namespace = normalizeNamespace(namespace); // changed here + if ("unflatten_model".equals(fname) || "flatten_model".equals(fname)) { + System.err.println("REGISTER FUNC: ns=" + namespace + " fname=" + fname + " opt=" + opt); + } FunctionDictionary dict = _namespaces.get(namespace); if (dict == null) _namespaces.put(namespace, dict = new FunctionDictionary<>()); @@ -76,7 +80,7 @@ public synchronized void addFunctionProgramBlock(String namespace, String fname, } public synchronized void removeFunctionProgramBlock(String namespace, String fname) { - namespace = getSafeNamespace(namespace); + namespace = normalizeNamespace(namespace); // changed here FunctionDictionary dict = null; if( _namespaces.containsKey(namespace) ){ dict = _namespaces.get(namespace); @@ -106,7 +110,7 @@ public synchronized HashMap getFunctionProgramBlock } public synchronized boolean containsFunctionProgramBlock(String namespace, String fname) { - namespace = getSafeNamespace(namespace); + namespace = normalizeNamespace(namespace); // changed here return _namespaces.containsKey(namespace) && _namespaces.get(namespace).containsFunction(fname); } @@ -117,7 +121,7 @@ public synchronized boolean containsFunctionProgramBlock(String fkey, boolean op } public synchronized boolean containsFunctionProgramBlock(String namespace, String fname, boolean opt) { - namespace = getSafeNamespace(namespace); + namespace = normalizeNamespace(namespace); // changed here return _namespaces.containsKey(namespace) && _namespaces.get(namespace).containsFunction(fname, opt); } @@ -132,7 +136,7 @@ public synchronized FunctionProgramBlock getFunctionProgramBlock(String fkey, bo } public synchronized FunctionProgramBlock getFunctionProgramBlock(String namespace, String fname, boolean opt) { - namespace = getSafeNamespace(namespace); + namespace = normalizeNamespace(namespace); // changed here FunctionDictionary dict = _namespaces.get(namespace); if (dict == null) throw new DMLRuntimeException("namespace " + namespace + " is undefined."); @@ -176,7 +180,26 @@ public Object clone() { ret.addFunctionProgramBlock(e1.getKey(), e2.getKey(), e2.getValue()); return ret; } - + + private static String normalizeNamespace(String namespace) { + // getNameSafe is not separator safe so i added this function to make sure that namespace is getting normalized without any conflicts + if (namespace == null) + return DMLProgram.DEFAULT_NAMESPACE; + + // normalize separators for cross-platform robustness + namespace = namespace.replace('\\', '/'); + + // normalize leading "./" + if (namespace.startsWith("./")) + namespace = namespace.substring(2); + + // collapse accidental double slashes + while (namespace.contains("//")) + namespace = namespace.replace("//", "/"); + + return namespace; + } + private static String getSafeNamespace(String namespace) { return (namespace == null) ? DMLProgram.DEFAULT_NAMESPACE : namespace; } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamServer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamServer.java index 333c889b7c1..e01145d6516 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamServer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamServer.java @@ -394,7 +394,72 @@ protected synchronized void updateAverageModel(int workerID, ListObject model) { break; } case ASP: - throw new NotImplementedException(); + // Async model averaging: + + //TODO: needs to be implement and expanded upon. I tried implementing it but it requires a much deeper scope of synchronization which i felt was beyond my current scope + // its running into deadlocks and race condition. Could be another open-source contribution + + // Each push is interpreted as a worker "model" (list of matrices). + // We (1) weight by 1/numWorkers, (2) accumulate into _accModels, and + // (3) periodically "commit" the accumulated average as new global model. + + // 1) weight the incoming model by 1/numWorkers (in-place) + ListObject weighted = weightModels(model, _numWorkers); + + // 2) accrue into accumulator (sum of weighted models) + _accModels = ParamservUtils.accrueGradients(_accModels, weighted, true); + + // 3) commit policy: mimic ASP pseudo-epoch boundaries from gradient mode + _syncCounter++; + + boolean doCommit = false; + if (_freq == Statement.PSFrequency.EPOCH) { + // commit after numWorkers pushes + doCommit = (_syncCounter % _numWorkers) == 0; + } + else if (_freq == Statement.PSFrequency.BATCH) { + // commit after numWorkers * numBatchesPerEpoch pushes + if (_numBatchesPerEpoch != -1) { + long period = (long) _numWorkers * (long) _numBatchesPerEpoch; + doCommit = (period > 0) && ((long) _syncCounter % period == 0); + } + else { + // fallback: commit after numWorkers pushes if unknown batches/epoch + doCommit = (_syncCounter % _numWorkers) == 0; + } + } + else if (_freq == Statement.PSFrequency.NBATCHES) { + // simplest: commit every push (adjust if you want different semantics) + doCommit = true; + } + + if (doCommit) { + // Set global model to accumulated average + _model = setParams(_ec, _accModels, _model); + _accModels = null; + + if (DMLScript.STATISTICS && tAgg != null) + ParamServStatistics.accAggregationTime((long) tAgg.stop()); + + if (LOG.isInfoEnabled()) + LOG.info("[+] PARAMSERV: completed PSEUDO EPOCH (ASP, MODELAVG) " + _epochCounter); + + time_epoch(); + + if (_validationPossible) + validate(); + + _epochCounter++; + _syncCounter = 0; + + // Broadcast committed model to all workers + broadcastModel(true); + } + else { + // Worker needs a model to continue even without committing + broadcastModel(workerID); + } + break; default: throw new DMLRuntimeException("Unsupported update: " + _updateType.name()); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java index 29ea19b7136..7cfb81840c9 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java @@ -118,6 +118,10 @@ public static void cleanupListObject(ExecutionContext ec, ListObject lo) { } public static void cleanupListObject(ExecutionContext ec, ListObject lo, boolean[] status) { + if (status != null && status.length != lo.getLength()){ + lo.deriveAndSetStatusFromData(); + status = lo.getStatus(); + } for (int i = 0; i < lo.getLength(); i++) { if (status != null && !status[i]) continue; // data ref by other object must not be cleaned up @@ -229,27 +233,53 @@ public static MatrixBlock generateReplicationMatrix(int nsamples, int nrows, lon return seq.ctableSeqOperations(sample, 1.0, new MatrixBlock(nsamples, nrows, true), false); } +// public static ExecutionContext createExecutionContext(ExecutionContext ec, +// LocalVariableMap varsMap, String updFunc, String aggFunc, int k) +// { +// return createExecutionContext(ec, varsMap, updFunc, aggFunc, k, false); +// } +// +// public static ExecutionContext createExecutionContext(ExecutionContext ec, +// LocalVariableMap varsMap, String updFunc, String aggFunc, int k, boolean forceExecTypeCP) +// { +// Program prog = ec.getProgram(); +// +// // 1. Recompile the internal program blocks +// recompileProgramBlocks(k, prog.getProgramBlocks(), forceExecTypeCP); +// // 2. Recompile the imported function blocks +// boolean opt = prog.getFunctionProgramBlocks(false).isEmpty(); +// prog.getFunctionProgramBlocks(opt) +// .forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks(), forceExecTypeCP)); +// +// // 3. Copy all functions +// return ExecutionContextFactory.createContext( +// new LocalVariableMap(varsMap), copyProgramFunctions(prog)); +// } public static ExecutionContext createExecutionContext(ExecutionContext ec, - LocalVariableMap varsMap, String updFunc, String aggFunc, int k) + LocalVariableMap varsMap, String updFunc, String aggFunc, int k) { return createExecutionContext(ec, varsMap, updFunc, aggFunc, k, false); } public static ExecutionContext createExecutionContext(ExecutionContext ec, - LocalVariableMap varsMap, String updFunc, String aggFunc, int k, boolean forceExecTypeCP) + LocalVariableMap varsMap, String updFunc, String aggFunc, int k, boolean forceExecTypeCP) { Program prog = ec.getProgram(); - // 1. Recompile the internal program blocks + // 1) Recompile internal program blocks recompileProgramBlocks(k, prog.getProgramBlocks(), forceExecTypeCP); - // 2. Recompile the imported function blocks - boolean opt = prog.getFunctionProgramBlocks(false).isEmpty(); - prog.getFunctionProgramBlocks(opt) - .forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks(), forceExecTypeCP)); - // 3. Copy all functions + // 2) Recompile imported function blocks for BOTH opt=false and opt=true maps + for (boolean opt : new boolean[] { false, true }) { + prog.getFunctionProgramBlocks(opt) + .forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks(), forceExecTypeCP)); + } + + // 3) Copy ALL functions (both maps) into the new Program used by workers return ExecutionContextFactory.createContext( - new LocalVariableMap(varsMap), copyProgramFunctions(prog)); + new LocalVariableMap(varsMap), + copyProgramFunctions(prog) + ); } public static List copyExecutionContext(ExecutionContext ec, int num) { @@ -261,17 +291,61 @@ public static List copyExecutionContext(ExecutionContext ec, i } private static Program copyProgramFunctions(Program prog) { +// Program newProg = new Program(prog.getDMLProg()); +// boolean opt = prog.getFunctionProgramBlocks(false).isEmpty(); +// for( Entry e : prog.getFunctionProgramBlocks(opt).entrySet() ) { +// String[] parts = DMLProgram.splitFunctionKey(e.getKey()); +// FunctionProgramBlock fpb = ProgramConverter +// .createDeepCopyFunctionProgramBlock(e.getValue(), new HashSet<>(), new HashSet<>()); +// fpb._namespace = parts[0]; +// fpb._functionName = parts[1]; +// newProg.addFunctionProgramBlock(parts[0], parts[1], fpb, opt); +// newProg.addProgramBlock(fpb); +// } +// return newProg; + LOG.warn("Entered the program"); Program newProg = new Program(prog.getDMLProg()); - boolean opt = prog.getFunctionProgramBlocks(false).isEmpty(); - for( Entry e : prog.getFunctionProgramBlocks(opt).entrySet() ) { - String[] parts = DMLProgram.splitFunctionKey(e.getKey()); - FunctionProgramBlock fpb = ProgramConverter - .createDeepCopyFunctionProgramBlock(e.getValue(), new HashSet<>(), new HashSet<>()); - fpb._namespace = parts[0]; - fpb._functionName = parts[1]; - newProg.addFunctionProgramBlock(parts[0], parts[1], fpb, opt); - newProg.addProgramBlock(fpb); + + // Copy BOTH opt=false and opt=true maps + for (boolean opt : new boolean[] { false, true }) { + for (Entry e : prog.getFunctionProgramBlocks(opt).entrySet()) { + String[] parts = DMLProgram.splitFunctionKey(e.getKey()); + + FunctionProgramBlock fpb = ProgramConverter + .createDeepCopyFunctionProgramBlock(e.getValue(), new HashSet<>(), new HashSet<>()); + + fpb._namespace = parts[0]; + fpb._functionName = parts[1]; + + newProg.addFunctionProgramBlock(parts[0], parts[1], fpb, opt); + newProg.addProgramBlock(fpb); + } + } + + LOG.warn("=== copyProgramFunctions: function keys (opt=false) ==="); + for (String k : newProg.getFunctionProgramBlocks(false).keySet()) + LOG.warn("FKEY(opt=false): " + k); + + LOG.warn("=== copyProgramFunctions: function keys (opt=true) ==="); + for (String k : newProg.getFunctionProgramBlocks(true).keySet()) + LOG.warn("FKEY(opt=true): " + k); + +// Quick namespace sanity check: "./scripts/..." vs "scripts/..." + String nsDot = "./scripts/builtin/autoencoder_2layer.dml"; + String nsNo = "scripts/builtin/autoencoder_2layer.dml"; + String fn = "train_default_layers"; + + try { + boolean hasDot = (newProg.getFunctionProgramBlock(nsDot, fn) != null); + boolean hasNo = (newProg.getFunctionProgramBlock(nsNo, fn) != null); + + LOG.warn("Lookup " + nsDot + "::" + fn + " => " + hasDot); + LOG.warn("Lookup " + nsNo + "::" + fn + " => " + hasNo); + } + catch (Exception ex) { + LOG.warn("Function lookup threw: " + ex.getMessage(), ex); } + return newProg; } @@ -387,6 +461,9 @@ public static ListObject accrueGradients(ListObject accGradients, ListObject gra public static ListObject accrueGradients(ListObject accGradients, ListObject gradients, boolean par, boolean cleanup) { if (accGradients == null) return ParamservUtils.copyList(gradients, cleanup); + if (accGradients.getLength() != gradients.getLength()) { + throw new DMLRuntimeException("Gradient list length mismatch: accGradients=" + accGradients.getLength() + ", gradients=" + gradients.getLength()); + } IntStream range = IntStream.range(0, accGradients.getLength()); (par ? range.parallel() : range).forEach(i -> { MatrixBlock mb1 = ((MatrixObject) accGradients.getData().get(i)).acquireReadAndRelease(); @@ -422,6 +499,8 @@ public static ListObject accrueModels(ListObject accModels, ListObject model, bo public static ListObject accrueModels(ListObject accModels, ListObject model, boolean par, boolean cleanup) { if (accModels == null) return ParamservUtils.copyList(model, cleanup); + if (accModels.getLength() != model.getLength()) + throw new DMLRuntimeException("Model list length mismatch: acc=" + accModels.getLength() + ", model=" + model.getLength()); IntStream range = IntStream.range(0, accModels.getLength()); (par ? range.parallel() : range).forEach(i -> { MatrixBlock mb1 = ((MatrixObject) accModels.getData().get(i)).acquireReadAndRelease(); diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java index d1453506a2b..c181ff67576 100644 --- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java +++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java @@ -370,43 +370,95 @@ public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramB * @param fnCreated ? * @param plain ? */ - public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set fnStack, Set fnCreated, boolean plain) +// public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set fnStack, Set fnCreated, boolean plain) +// { +// //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) +// FunctionProgramBlock fpb1 = prog.getFunctionProgramBlock(namespace, oldName, true); +// FunctionProgramBlock fpb2 = prog.containsFunctionProgramBlock(namespace, oldName, false) ? +// prog.getFunctionProgramBlock(namespace, oldName, false) : null; +// String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); +// String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew); +// +// if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) +// return; //prevent redundant deep copy if already existent +// +// //create deep copy +// FunctionProgramBlock copy1 = null; +// if( !fnStack.contains(fnameNewKey) ) { +// fnStack.add(fnameNewKey); +// copy1 = createDeepCopyFunctionProgramBlock(fpb1, fnStack, fnCreated, pid, IDPrefix, plain); +// fnStack.remove(fnameNewKey); +// } +// else //stop deep copy for recursive function calls +// copy1 = fpb1; +// +// //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning +// //note: instructions not used by function program block +// +// //put if not existing (recursive processing might have added it) +// if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) { +// prog.addFunctionProgramBlock(namespace, fnameNew, copy1, true); +// if( fpb2 != null ) { +// FunctionProgramBlock copy2 = createDeepCopyFunctionProgramBlock( +// fpb2, fnStack, fnCreated, pid, IDPrefix, plain); +// prog.addFunctionProgramBlock(namespace, fnameNew, copy2, false); +// } +// fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew)); +// } +// } + + public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set fnStack, Set fnCreated, boolean plain) { - //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) - FunctionProgramBlock fpb1 = prog.getFunctionProgramBlock(namespace, oldName, true); - FunctionProgramBlock fpb2 = prog.containsFunctionProgramBlock(namespace, oldName, false) ? - prog.getFunctionProgramBlock(namespace, oldName, false) : null; - String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); + // Try to get both opt=true and opt=false versions + // Some functions may only exist in one map + FunctionProgramBlock fpb1 = null; + FunctionProgramBlock fpb2 = null; + + if (prog.containsFunctionProgramBlock(namespace, oldName, true)) { + fpb1 = prog.getFunctionProgramBlock(namespace, oldName, true); + } + if (prog.containsFunctionProgramBlock(namespace, oldName, false)) { + fpb2 = prog.getFunctionProgramBlock(namespace, oldName, false); + } + + // If neither exists, that's an error + if (fpb1 == null && fpb2 == null) { + throw new DMLRuntimeException("function " + oldName + " is undefined in namespace " + namespace); + } + + // Continue with whichever ones we found + String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew); - if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) - return; //prevent redundant deep copy if already existent - + if (prog.containsFunctionProgramBlock(namespace, fnameNew, true) + || prog.containsFunctionProgramBlock(namespace, fnameNew, false)) { + return; + } + //create deep copy FunctionProgramBlock copy1 = null; - if( !fnStack.contains(fnameNewKey) ) { + FunctionProgramBlock copy2 = null; + + if (fpb1 != null && !fnStack.contains(fnameNewKey)) { fnStack.add(fnameNewKey); copy1 = createDeepCopyFunctionProgramBlock(fpb1, fnStack, fnCreated, pid, IDPrefix, plain); fnStack.remove(fnameNewKey); } - else //stop deep copy for recursive function calls + else if (fpb1 != null) //stop deep copy for recursive function calls copy1 = fpb1; - - //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning - //note: instructions not used by function program block - - //put if not existing (recursive processing might have added it) + + //put if not existing recursive processing might have added it if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) { - prog.addFunctionProgramBlock(namespace, fnameNew, copy1, true); - if( fpb2 != null ) { - FunctionProgramBlock copy2 = createDeepCopyFunctionProgramBlock( - fpb2, fnStack, fnCreated, pid, IDPrefix, plain); + if (copy1 != null) { + prog.addFunctionProgramBlock(namespace, fnameNew, copy1, true); + } + if (fpb2 != null) { + copy2 = createDeepCopyFunctionProgramBlock(fpb2, fnStack, fnCreated, pid, IDPrefix, plain); prog.addFunctionProgramBlock(namespace, fnameNew, copy2, false); } fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew)); } } - public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set fnStack, Set fnCreated) { return createDeepCopyFunctionProgramBlock(fpb, fnStack, fnCreated, 0, -1, true); } diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedBasicTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedBasicTest.java new file mode 100644 index 00000000000..962d2427f6a --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedBasicTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.builtin.part1; + +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.junit.Assert; +import org.junit.Test; + +public class BuiltinAutoencoderGeneralizedBasicTest extends AutomatedTestBase { + private static final String TEST_NAME = "autoencoderGeneralized"; + private static final String TEST_DIR = "functions/builtin/"; + private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinAutoencoderGeneralizedBasicTest.class.getSimpleName() + "/"; + + private static final int ROWS = 128; + private static final int COLS = 64; + private static final double DENSE = 0.9; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, + new String[] { "W1_out", "Wlast_out", "hidden_out" })); + } + + @Test + public void testAutoencoderThreeLayerOutputs() { + runAutoencoderTest(16, 8, 4, 2, DENSE); + } + + private void runAutoencoderTest(int h1, int h2, int h3, int maxEpochs, double sparsity) { + loadTestConfiguration(getTestConfiguration(TEST_NAME)); + + String home = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = home + TEST_NAME + ".dml"; + programArgs = new String[] { "-nvargs", + "X=" + input("X"), + "H1=" + h1, "H2=" + h2, "H3=" + h3, + "EPOCH=" + maxEpochs, "BATCH=" + 32, + "STEP=" + 1e-4, "DECAY=" + 0.95, "MOMENTUM=" + 0.9, + "METHOD=DEFAULTSERVER", "MODE=LOCAL", "UTYPE=BSP", + "FREQ=BATCH", "WORKERS=1", "SCHEME=DISJOINT_RANDOM", + "NBATCHES=0", "MODELAVG=FALSE", + "W1_out=" + output("W1_out"), + "Wlast_out=" + output("Wlast_out"), + "hidden_out=" + output("hidden_out") + }; + + double[][] X = getRandomMatrix(ROWS, COLS, 0, 1, sparsity, 42); + writeInputMatrixWithMTD("X", X, true); + + runTest(true, false, null, -1); + + MatrixCharacteristics w1Meta = readDMLMetaDataFile("W1_out"); + MatrixCharacteristics wlastMeta = readDMLMetaDataFile("Wlast_out"); + MatrixCharacteristics hiddenMeta = readDMLMetaDataFile("hidden_out"); + + Assert.assertEquals(h1, w1Meta.getRows()); + Assert.assertEquals(COLS, w1Meta.getCols()); + Assert.assertEquals(COLS, wlastMeta.getRows()); + Assert.assertEquals(h1, wlastMeta.getCols()); + Assert.assertEquals(ROWS, hiddenMeta.getRows()); + Assert.assertEquals(h3, hiddenMeta.getCols()); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedTest.java new file mode 100644 index 00000000000..344a25667a7 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinAutoencoderGeneralizedTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.test.functions.builtin.part1; + +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.junit.Assert; +import org.junit.Test; + + +public class BuiltinAutoencoderGeneralizedTest extends AutomatedTestBase { + private static final String TEST_NAME = "autoencoderGeneralized"; + private static final String TEST_DIR = "functions/builtin/"; + private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinAutoencoderGeneralizedTest.class.getSimpleName() + "/"; + + private static final int ROWS = 128; + private static final int COLS = 64; + private static final double DENSE = 0.9; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, + new String[] { "W1_out", "Wlast_out", "hidden_out" })); + } + + @Test + public void testAutoencoderThreeLayerOutputs() { + runAutoencoderTest("DEFAULTSERVER", 16, 8, 4, 2, DENSE); + } + + @Test + public void testAutoencoderTwoLayerOutputs() { + runAutoencoderTest("DEFAULTSERVER", 16, 8, 0, 0, DENSE); + } + + @Test + public void testAutoencoderSingleLayerOutputs() { + runAutoencoderTest("DEFAULTSERVER", 16, 0, 0, 1, DENSE); + } + + @Test + public void testAutoencoderSparseInputOutputs() { + runAutoencoderTest("DEFAULTSERVER", 32, 16, 8, 2, 0.2); + } + + @Test + public void testAutoencoderParamservOutputs() { + runAutoencoderTest("PARAMSERVER", 16, 8, 4, 2, DENSE); + } + private void runAutoencoderTest(String method, int h1, int h2, int h3, int maxEpochs, double sparsity) { + int expectedHidden = h3 > 0 ? h3 : (h2 > 0 ? h2 : h1); + loadTestConfiguration(getTestConfiguration(TEST_NAME)); + + String home = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = home + TEST_NAME + ".dml"; + programArgs = new String[] { "-nvargs", + "X=" + input("X"), + "H1=" + h1, "H2=" + h2, "H3=" + h3, + "EPOCH=" + maxEpochs, "BATCH=" + 32, + "STEP=" + 1e-4, "DECAY=" + 0.95, "MOMENTUM=" + 0.9, + "METHOD=" + method, "MODE=LOCAL", "UTYPE=BSP", + "FREQ=BATCH", "WORKERS=1", "SCHEME=DISJOINT_RANDOM", + "NBATCHES=0", "MODELAVG=FALSE", + "W1_out=" + output("W1_out"), + "Wlast_out=" + output("Wlast_out"), + "hidden_out=" + output("hidden_out") + }; + + double[][] X = getRandomMatrix(ROWS, COLS, 0, 1, sparsity, 42); + writeInputMatrixWithMTD("X", X, true); + + runTest(true, false, null, -1); + + MatrixCharacteristics w1Meta = readDMLMetaDataFile("W1_out"); + MatrixCharacteristics wlastMeta = readDMLMetaDataFile("Wlast_out"); + MatrixCharacteristics hiddenMeta = readDMLMetaDataFile("hidden_out"); + + Assert.assertEquals(h1, w1Meta.getRows()); + Assert.assertEquals(COLS, w1Meta.getCols()); + Assert.assertEquals(COLS, wlastMeta.getRows()); + Assert.assertEquals(h1, wlastMeta.getCols()); + Assert.assertEquals(ROWS, hiddenMeta.getRows()); + Assert.assertEquals(expectedHidden, hiddenMeta.getCols()); + } +} \ No newline at end of file diff --git a/src/test/scripts/functions/builtin/autoDataGen.dml b/src/test/scripts/functions/builtin/autoDataGen.dml new file mode 100644 index 00000000000..a9b52f2f926 --- /dev/null +++ b/src/test/scripts/functions/builtin/autoDataGen.dml @@ -0,0 +1,15 @@ +# gen_X +setwd("."); + +rows = as.integer($ROWS) +cols = as.integer($COLS) + +# Dense random in [0,1) +X = rand(rows=rows, cols=cols, min=0, max=1) + +# make it more “autoencoder-like” noisy reconstruction task +# noise = rand(rows=rows, cols=cols, min=-0.05, max=0.05) +# X = pmin(pmax(X + noise, 0), 1) + +# Write as SystemDS binary fast + safe +write(X, $OUT, format="binary") \ No newline at end of file diff --git a/src/test/scripts/functions/builtin/autoGradientCheck.dml b/src/test/scripts/functions/builtin/autoGradientCheck.dml new file mode 100644 index 00000000000..4160eef0c67 --- /dev/null +++ b/src/test/scripts/functions/builtin/autoGradientCheck.dml @@ -0,0 +1,121 @@ +# autoGradientCheck.dml +# Usage: +# systemds autoGradientCheck.dml -exec singlenode -stats -nvargs \ +# X=/path/to/X.bin H1=16 H2=8 H3=4 LAYER=6 R=1 C=1 EPS=1e-4 SEED=123 + +source("scripts/builtin/autoencoder_2layer.dml") as ae + +# ------------------------- +# Inputs +# ------------------------- +X = read($X) +n = nrow(X) +m = ncol(X) + +h1 = as.integer($H1) +h2 = as.integer($H2) +h3 = as.integer($H3) + +target_layer = as.integer($LAYER) # 1..layer_count +r = as.integer($R) +c = as.integer($C) +eps = as.double($EPS) + +# Make SEED required for determinism (pass SEED=123 on command line) +#seed = as.integer($SEED) +#setseed(seed) + +# ------------------------- +# hidden_layers list +# ------------------------- +hidden_layers = list(h1) +if(h2 > 0) hidden_layers = append(hidden_layers, list(h2)) +if(h3 > 0) hidden_layers = append(hidden_layers, list(h3)) + +# ------------------------- +# Same preprocessing as training: permute + zscore +# ------------------------- +order_rand = Rand(rows=n, cols=1, min=0, max=1, pdf="uniform") +permut = table( + seq(1, n, 1), + order(target=order_rand, by=1, index.return=TRUE), + n, n +) + +X_pre = permut %*% X +means = colSums(X_pre)/n +stds = sqrt((colSums(X_pre^2)/n - means*means)*n/(n-1)) + 1e-17 +X_pre = (X_pre - means)/stds + +# ------------------------- +# Init model +# ------------------------- +layer_sizes = ae::build_layer_sizes(m, hidden_layers) +layer_count = length(layer_sizes) - 1 + +if(target_layer < 1 | target_layer > layer_count) + stop("LAYER must be in [1," + layer_count + "]") + +[W_list0, b_list0, updW0, updb0] = ae::init_layers(layer_sizes) + +# ------------------------- +# Analytic gradient +# ------------------------- +[act0, primes0, Yhat0, E0] = ae::forward_pass_layers(X_pre, W_list0, b_list0, X_pre) +[grads_W, grads_b] = ae::backward_pass_layers(X_pre, act0, primes0, W_list0, E0) + +# grads_W[1] is last layer, so map target_layer -> idx +idx = layer_count - target_layer + 1 +G = as.matrix(grads_W[idx]) +g_analytic = as.double(as.scalar(G[r, c])) + +# ------------------------- +# Helper: replace one layer matrix in W_list +# ------------------------- +replace_layer = function(list[unknown] W_list, Integer k, Matrix[Double] Wnew) + return(list[unknown] W_out) +{ + W_out = list() + for(i in 1:length(W_list)) { + if(i == k) W_out = append(W_out, list(Wnew)) + else W_out = append(W_out, list(as.matrix(W_list[i]))) + } +} + +# ------------------------- +# Finite differences for W[target_layer][r,c] +# ------------------------- +Wk = as.matrix(W_list0[target_layer]) +nr = nrow(Wk); nc = ncol(Wk) + +if(r < 1 | r > nr | c < 1 | c > nc) + stop("R,C out of bounds for W. W is " + nr + "x" + nc) + +# sparse D with eps at (r,c) +ri = matrix(as.double(r), rows=1, cols=1) +ci = matrix(as.double(c), rows=1, cols=1) +vi = matrix(eps, rows=1, cols=1) +D = table(ri, ci, vi, nr, nc) + +W_plus = Wk + D +W_minus = Wk - D + +W_list_plus = replace_layer(W_list0, target_layer, W_plus) +W_list_minus = replace_layer(W_list0, target_layer, W_minus) + +# IMPORTANT: no "_" throwaway vars in DML +[act_p, primes_p, Yhat_p, E_plus] = ae::forward_pass_layers(X_pre, W_list_plus, b_list0, X_pre) +[act_m, primes_m, Yhat_m, E_minus] = ae::forward_pass_layers(X_pre, W_list_minus, b_list0, X_pre) + +obj_plus = ae::obj(E_plus) +obj_minus = ae::obj(E_minus) + +g_numeric = (obj_plus - obj_minus) / (2 * eps) + +den = max(1e-12, abs(g_numeric) + abs(g_analytic)) +rel_err = abs(g_numeric - g_analytic) / den + +print("GRADCHECK layer=" + target_layer + " r=" + r + " c=" + c + " eps=" + eps) +print(" analytic = " + g_analytic) +print(" numeric = " + g_numeric) +print(" rel_err = " + rel_err) diff --git a/src/test/scripts/functions/builtin/autoencoderGeneralized.dml b/src/test/scripts/functions/builtin/autoencoderGeneralized.dml new file mode 100644 index 00000000000..1c84050d65b --- /dev/null +++ b/src/test/scripts/functions/builtin/autoencoderGeneralized.dml @@ -0,0 +1,110 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +#------------------------------------------------------------- +# Apache SystemDS - autoencoder wrapper +# Prints stable BEFORE/AFTER objective lines for parsing +# and optionally writes outputs (skips if /dev/null). +#------------------------------------------------------------- +source("scripts/builtin/autoencoder_2layer.dml") as ae + +# --------- read inputs --------- +X = read($X) + +# Build encoder hidden layer list +hidden_layers = list(as.integer($H1)) +if(as.integer($H2) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H2))) +if(as.integer($H3) > 0) + hidden_layers = append(hidden_layers, list(as.integer($H3))) + +# --------- compute objective BEFORE training --------- +# I compute it on a standardized + permuted copy, consistent with training. +# Create a fixed permutation seed via order_rand so both pre and training match. +n = nrow(X) +order_rand = Rand(rows=n, cols=1, min=0, max=1, pdf="uniform") +permut = table( + seq(1, n, 1), + order(target=order_rand, by=1, index.return=TRUE), + n, n +) +X_pre = permut %*% X + +means = colSums(X_pre) / n +stds = sqrt((colSums(X_pre^2)/n - means*means) * n/(n-1)) + 1e-17 +X_pre = (X_pre - means) / stds + +# Initialize a model once which same initializer used by training +layer_sizes = ae::build_layer_sizes(ncol(X_pre), hidden_layers) +[W_list0, b_list0, updW0, updb0] = ae::init_layers(layer_sizes) + +# Forward + objective +[act0, primes0, Yhat0, E0] = ae::forward_pass_layers(X_pre, W_list0, b_list0, X_pre) +obj_before = ae::obj(E0) +print("WRAPPER FULL OBJ BEFORE TRAIN = " + obj_before) + +# --------- train --------- +# IMPORTANT: pass order_rand so training uses same permutation as X_pre +[model, hidden] = ae::m_autoencoder( + X=X, + hidden_layers=hidden_layers, + max_epochs=as.integer($EPOCH), + full_obj=as.boolean($FULL_OBJ), + batch_size=as.integer($BATCH), + step=as.double($STEP), + decay=as.double($DECAY), + mu=as.double($MOMENTUM), + method=$METHOD, + mode=$MODE, + utype=$UTYPE, + freq=$FREQ, + k=as.integer($WORKERS), + scheme=$SCHEME, + nbatches=as.integer($NBATCHES), + modelAvg=as.boolean($MODELAVG), + order_rand=order_rand +) + +# --------- compute objective AFTER training on SAME X_pre --------- +encoder_layers = length(hidden_layers) +layer_count = 2 * encoder_layers + +W_list1 = list() +b_list1 = list() + +# model layout in generalized code path is: +# [W1..WL, b1..bL, updW1..updWL, updb1..updbL] +for(i in 1:layer_count) + W_list1 = append(W_list1, list(as.matrix(model[i]))) +for(i in 1:layer_count) + b_list1 = append(b_list1, list(as.matrix(model[layer_count + i]))) + +[act1, primes1, Yhat1, E1] = ae::forward_pass_layers(X_pre, W_list1, b_list1, X_pre) +obj_after = ae::obj(E1) +print("WRAPPER FULL OBJ AFTER TRAIN = " + obj_after) + +# --------- optional outputs skip if /dev/null --------- +W1_out = as.matrix(model[1]) +Wlast_out = as.matrix(model[layer_count]) +hidden_out = hidden + +if($W1_out != "/dev/null") write(W1_out, $W1_out) +if($Wlast_out != "/dev/null") write(Wlast_out, $Wlast_out) +if($hidden_out != "/dev/null") write(hidden_out, $hidden_out) diff --git a/src/test/scripts/functions/builtin/autoencoder_2layer.dml b/src/test/scripts/functions/builtin/autoencoder_2layer.dml index 05574cb0201..1993680ab9f 100644 --- a/src/test/scripts/functions/builtin/autoencoder_2layer.dml +++ b/src/test/scripts/functions/builtin/autoencoder_2layer.dml @@ -19,17 +19,848 @@ # #------------------------------------------------------------- -[W1, b1, W2, b2, W3, b3, W4, b4, hidden] = autoencoder_2layer(X = read($X), W1_rand = read($W1_rand), W2_rand = read($W2_rand), - W3_rand = read($W3_rand), W4_rand = read($W4_rand), order_rand = read($order_rand), - num_hidden1 = $H1, num_hidden2 = $H2, max_epochs = $EPOCH, full_obj = $OBJ, - batch_size = $BATCH, step = $STEP, decay = $DECAY, mu = $MOMENTUM) - -write(W1, $W1_out); -write(b1, $b1_out); -write(W2, $W2_out); -write(b2, $b2_out); -write(W3, $W3_out); -write(b3, $b3_out); -write(W4, $W4_out); -write(b4, $b4_out); -write(hidden, $hidden_out) \ No newline at end of file +# ============================================================================= +# Autoencoder 2-layer + generalized with Parameter Server training +# Trains a 2-layer autoencoder with minibatch SGD and step-size decay. +# If invoked with H1 > H2 then it becomes a 'bowtie' structured autoencoder +# Weights are initialized using Glorot & Bengio (2010) AISTATS initialization. +# The script standardizes the input before training (can be turned off). +# Also, it randomly reshuffles rows before training. +# Currently, tanh is set to be the activation function. +# By re-implementing 'func' DML-bodied function, one can change the activation. +# +# INPUT: +# --------------------------------------------------------------------------------------------- +# X Filename where the input is stored +# num_hidden1 Number of neurons in the 1st hidden layer +# num_hidden2 Number of neurons in the 2nd hidden layer +# max_epochs Number of epochs to train for +# full_obj If TRUE, Computes objective function value (squared-loss) +# at the end of each epoch. Note that, computing the full +# objective can take a lot of time. +# batch_size Mini-batch size training parameter +# step Initial step size training parameter +# decay Decays step size after each epoch training parameter +# mu Momentum parameter training parameter +# method Training method: DEFAULTSERVER or PARAMSERVER +# mode Paramserv mode: LOCAL or DISTRIBUTED (REMOTE_SPARK) # TODO: distributed needs to be implemented +# utype Paramserv update type (e.g., BSP, ASP) #TODO: Add support for ASP +# freq Paramserv update frequency (e.g., BATCH, EPOCH) +# k Number of workers +# scheme Data partitioning scheme for paramserv +# nbatches Optional number of batches per epoch for paramserv +# modelAvg Optional boolean parameter to select between updating or averaging the model in paramserver side +# W1_rand Weights might be initialized via input matrices +# W2_rand --- +# W3_rand --- +# W4_rand --- +# --------------------------------------------------------------------------------------------- +# +# OUTPUT: +# ---------------------------------------------------------------------------------------------------- +# W1_out Matrix storing weights between input layer and 1st hidden layer +# b1_out Matrix storing bias between input layer and 1st hidden layer +# W2_out Matrix storing weights between 1st hidden layer and 2nd hidden layer +# b2_out Matrix storing bias between 1st hidden layer and 2nd hidden layer +# W3_out Matrix storing weights between 2nd hidden layer and 3rd hidden layer +# b3_out Matrix storing bias between 2nd hidden layer and 3rd hidden layer +# W4_out Matrix storing weights between 3rd hidden layer and output layer +# b4_out Matrix storing bias between 3rd hidden layer and output layer +# HIDDEN Matrix storing the hidden (2nd) layer representation if needed +# ---------------------------------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- +# Generalized autoencoder entry point (arbitrary encoder depth) +# ----------------------------------------------------------------------------- +m_autoencoder = function(Matrix[Double] X, list[unknown] hidden_layers, Integer max_epochs, + Boolean full_obj = FALSE, Integer batch_size = 256, Double step = 1e-5, Double decay = 0.95, Double mu = 0.9, + String method = "DEFAULTSERVER", String mode = "LOCAL", String utype = "BSP", String freq = "BATCH", + Integer k = 1, String scheme = "DISJOINT_RANDOM", Integer nbatches = 0, Boolean modelAvg = FALSE, + Matrix[Double] order_rand = matrix(0, rows=0, cols=0)) + return(list[unknown] model, Matrix[Double] reordered_H) +{ + # hidden_layers defines encoder sizes e.g., list(128, 64). + # Decoder mirrors encoder automatically. + n = nrow(X) + m = ncol(X) + #randomly reordering rows + if(nrow(order_rand) == 0 & ncol(order_rand) == 0) + permut = table(seq(1,n,1), order(target=Rand(rows=n, cols=1, min=0, max=1, pdf="uniform"), by=1, index.return=TRUE), n, n) + else + permut = table(seq(1,n,1), order(target=order_rand, by=1, index.return=TRUE), n, n) + X = permut %*% X + + #z-transform, whitening operator is better + means = colSums(X)/n + stds = sqrt((colSums(X^2)/n - means*means)*n/(n-1)) + 1e-17 + X = (X - means)/stds + + # Build full layer sizes including mirrored decoder and output layer. + layer_sizes = build_layer_sizes(m, hidden_layers) + layer_count = length(layer_sizes) - 1 + # Initialize weights, biases, and momentum buffers. + [W_list, b_list, upd_W_list, upd_b_list] = init_layers(layer_sizes) + + if(method == "DEFAULTSERVER") { + # Single-process training loop with full control over epochs/batches. + [W_list, b_list, upd_W_list, upd_b_list, reordered_H] = train_default_layers( + X, permut, W_list, b_list, upd_W_list, upd_b_list, + hidden_layers, max_epochs, full_obj, batch_size, step, decay, mu) + } + else if(method == "PARAMSERVER") { + # Parameter server training via paramserv builtin. + [W_list, b_list, upd_W_list, upd_b_list, reordered_H] = train_paramserv_layers( + X, permut, W_list, b_list, upd_W_list, upd_b_list, hidden_layers, layer_count, + max_epochs, batch_size, step, mu, mode, utype, freq, k, scheme, nbatches, modelAvg) + } + else { + stop("Unsupported method: " + method + ". Use DEFAULTSERVER or PARAMSERVER.") + } + + # Return flattened model list for compatibility with paramserv conventions. + model = flatten_model(W_list, b_list, upd_W_list, upd_b_list) +} + +# ----------------------------------------------------------------------------- +# Legacy 2-layer autoencoder entry point +# ----------------------------------------------------------------------------- +m_autoencoder_2layer = function(Matrix[Double] X, Integer num_hidden1, Integer num_hidden2, Integer max_epochs, + Boolean full_obj = FALSE, Integer batch_size = 256, Double step = 1e-5, Double decay = 0.95, Double mu = 0.9, + String method = "DEFAULTSERVER", String mode = "LOCAL", String utype = "BSP", String freq = "BATCH", + Integer k = 1, String scheme = "DISJOINT_RANDOM", Integer nbatches = 0, Boolean modelAvg = FALSE, + Matrix[Double] W1_rand = matrix(0, rows=0, cols=0), Matrix[Double] W2_rand = matrix(0, rows=0, cols=0), + Matrix[Double] W3_rand = matrix(0, rows=0, cols=0), Matrix[Double] W4_rand = matrix(0, rows=0, cols=0), + Matrix[Double] order_rand = matrix(0, rows=0, cols=0)) + return(Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, Matrix[Double] reordered_H) +{ + n = nrow(X) + m = ncol(X) + #randomly reordering rows + if(nrow(order_rand) == 0 & ncol(order_rand) == 0) + permut = table(seq(1,n,1), order(target=Rand(rows=n, cols=1, min=0, max=1, pdf="uniform"), by=1, index.return=TRUE), n, n) + else + permut = table(seq(1,n,1), order(target=order_rand, by=1, index.return=TRUE), n, n) + X = permut %*% X + + #z-transform, whitening operator is better + means = colSums(X)/n + stds = sqrt((colSums(X^2)/n - means*means)*n/(n-1)) + 1e-17 + X = (X - means)/stds + + if(nrow(W1_rand) == 0 & ncol(W1_rand) == 0) + W1_rand = Rand(rows=num_hidden1, cols=m, min=-1, max=1, pdf="uniform") + if(nrow(W2_rand) == 0 & ncol(W2_rand) == 0) + W2_rand = Rand(rows=num_hidden2, cols=num_hidden1, min=-1, max=1, pdf="uniform") + if(nrow(W3_rand) == 0 & ncol(W3_rand) == 0) + W3_rand = Rand(rows=num_hidden1, cols=num_hidden2, min=-1, max=1, pdf="uniform") + if(nrow(W4_rand) == 0 & ncol(W4_rand) == 0) + W4_rand = Rand(rows=m, cols=num_hidden1, min=-1, max=1, pdf="uniform") + + W1 = sqrt(6)/sqrt(m + num_hidden1) * W1_rand + b1 = matrix(0, rows=num_hidden1, cols=1) + W2 = sqrt(6)/sqrt(num_hidden1 + num_hidden2) * W2_rand + b2 = matrix(0, rows=num_hidden2, cols=1) + W3 = sqrt(6)/sqrt(num_hidden2 + num_hidden1) * W3_rand + b3 = matrix(0, rows=num_hidden1, cols=1) + W4 = sqrt(6)/sqrt(num_hidden1 + m) * W4_rand + b4 = matrix(0, rows=m, cols=1) + + if(method == "DEFAULTSERVER") { + [W1, b1, W2, b2, W3, b3, W4, b4, reordered_H] = train_default( + X, permut, W1, b1, W2, b2, W3, b3, W4, b4, + max_epochs, full_obj, batch_size, step, decay, mu) + } + else if(method == "PARAMSERVER") { + [W1, b1, W2, b2, W3, b3, W4, b4, reordered_H] = train_paramserv( + X, permut, W1, b1, W2, b2, W3, b3, W4, b4, + max_epochs, batch_size, step, mu, mode, utype, freq, k, scheme, nbatches, modelAvg) + } + else { + stop("Unsupported method: " + method + ". Use DEFAULTSERVER or PARAMSERVER.") + } +} + +# ----------------------------------------------------------------------------- +# Activation function and default: tanh +# ----------------------------------------------------------------------------- +# To use another activation, implement a DML-bodied function named 'func' +# and comment out this one. +func = function(Matrix[Double] X) return(Matrix[Double] Y, Matrix[Double] Y_prime){ + Y = (exp(2*X) - 1)/(exp(2*X) + 1) + Y_prime = 1 - Y^2 +} + +# ----------------------------------------------------------------------------- +# Fixed 2-layer helpers +# ----------------------------------------------------------------------------- +forward_pass = function(Matrix[Double] X, Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, Matrix[Double] Y) + return(Matrix[Double] H1, Matrix[Double] H1_prime,Matrix[Double] H2, Matrix[Double] H2_prime, + Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat, Matrix[Double] Yhat_prime, Matrix[Double] E) +{ + H1_in = t(W1 %*% t(X) + b1) + [H1, H1_prime] = func(H1_in) + + H2_in = t(W2 %*% t(H1) + b2) + [H2, H2_prime] = func(H2_in) + + H3_in = t(W3 %*% t(H2) + b3) + [H3, H3_prime] = func(H3_in) + + Yhat_in = t(W4 %*% t(H3) + b4) + [Yhat, Yhat_prime] = func(Yhat_in) + E = Yhat - Y +} + +backward_pass = function(Matrix[Double] X, Matrix[Double] H1, Matrix[Double] H1_prime, Matrix[Double] H2, Matrix[Double] H2_prime, + Matrix[Double] H3, Matrix[Double] H3_prime, Matrix[Double] Yhat_prime, Matrix[Double] E, + Matrix[Double] W1, Matrix[Double] W2, Matrix[Double] W3, Matrix[Double] W4) + return(Matrix[Double] W1_grad, Matrix[Double] b1_grad,Matrix[Double] W2_grad, Matrix[Double] b2_grad, + Matrix[Double] W3_grad, Matrix[Double] b3_grad, Matrix[Double] W4_grad, Matrix[Double] b4_grad) +{ + #backprop + delta4 = E * Yhat_prime + delta3 = H3_prime * (delta4 %*% W4) + delta2 = H2_prime * (delta3 %*% W3) + delta1 = H1_prime * (delta2 %*% W2) + + #compute gradients + b4_grad = t(colSums(delta4)) + b3_grad = t(colSums(delta3)) + b2_grad = t(colSums(delta2)) + b1_grad = t(colSums(delta1)) + + W4_grad = t(delta4) %*% H3 + W3_grad = t(delta3) %*% H2 + W2_grad = t(delta2) %*% H1 + W1_grad = t(delta1) %*% X +} + +obj = function(Matrix[Double] E) return(Double val){ + val = 0.5 * sum(E^2) +} + +# ----------------------------------------------------------------------------- +# Fixed 2-layer training = DEFAULTSERVER + PARAMSERVER +# ----------------------------------------------------------------------------- +apply_update = function(Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Matrix[Double] upd_W1, Matrix[Double] upd_b1, Matrix[Double] upd_W2, Matrix[Double] upd_b2, + Matrix[Double] upd_W3, Matrix[Double] upd_b3, Matrix[Double] upd_W4, Matrix[Double] upd_b4, + Matrix[Double] W1_grad, Matrix[Double] b1_grad, Matrix[Double] W2_grad, Matrix[Double] b2_grad, + Matrix[Double] W3_grad, Matrix[Double] b3_grad, Matrix[Double] W4_grad, Matrix[Double] b4_grad, + Double step, Double mu, Double batch_rows) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] upd_W1_out, Matrix[Double] upd_b1_out, Matrix[Double] upd_W2_out, Matrix[Double] upd_b2_out, + Matrix[Double] upd_W3_out, Matrix[Double] upd_b3_out, Matrix[Double] upd_W4_out, Matrix[Double] upd_b4_out) +{ + local_step = step / batch_rows + upd_W1_out = mu * upd_W1 - local_step * W1_grad + upd_b1_out = mu * upd_b1 - local_step * b1_grad + upd_W2_out = mu * upd_W2 - local_step * W2_grad + upd_b2_out = mu * upd_b2 - local_step * b2_grad + upd_W3_out = mu * upd_W3 - local_step * W3_grad + upd_b3_out = mu * upd_b3 - local_step * b3_grad + upd_W4_out = mu * upd_W4 - local_step * W4_grad + upd_b4_out = mu * upd_b4 - local_step * b4_grad + + W1_out = W1 + upd_W1_out + b1_out = b1 + upd_b1_out + W2_out = W2 + upd_W2_out + b2_out = b2 + upd_b2_out + W3_out = W3 + upd_W3_out + b3_out = b3 + upd_b3_out + W4_out = W4 + upd_W4_out + b4_out = b4 + upd_b4_out +} + +train_default = function(Matrix[Double] X, Matrix[Double] permut, + Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Integer max_epochs, Boolean full_obj, Integer batch_size, Double step, Double decay, Double mu) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] reordered_H) +{ + n = nrow(X) + upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) + upd_b1 = matrix(0, rows=nrow(b1), cols=ncol(b1)) + upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) + upd_b2 = matrix(0, rows=nrow(b2), cols=ncol(b2)) + upd_W3 = matrix(0, rows=nrow(W3), cols=ncol(W3)) + upd_b3 = matrix(0, rows=nrow(b3), cols=ncol(b3)) + upd_W4 = matrix(0, rows=nrow(W4), cols=ncol(W4)) + upd_b4 = matrix(0, rows=nrow(b4), cols=ncol(b4)) + + if( full_obj ){ + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + full_o = obj(full_E) + print("EPOCHS=" + 0 + " OBJ (FULL DATA): " + full_o) + } + + iter = 0 + num_iters_per_epoch = ceil(n / batch_size) + max_iterations = max_epochs * num_iters_per_epoch + #print("num_iters_per_epoch=" + num_iters_per_epoch + " max_iterations=" + max_iterations) + beg = 1 + while( iter < max_iterations ){ + end = beg + batch_size - 1 + if(end > n) end = n + X_batch = X[beg:end,] + + [H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat, Yhat_prime, E] + = forward_pass(X_batch, W1, b1, W2, b2, W3, b3, W4, b4, X_batch) + [W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad] + = backward_pass(X_batch, H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat_prime, E, W1, W2, W3, W4) + + o = obj(E) + print("epochs=" + (iter / num_iters_per_epoch) + " BATCH beg=" + beg + " end=" + end + " obj=" + o) + + [W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4] = apply_update( + W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4, + W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, + step, mu, nrow(X_batch)) + + iter = iter + 1 + if(end == n) beg = 1 + else beg = end + 1 + + if( iter %% num_iters_per_epoch == 0 ) step = step * decay + + if( full_obj & iter %% num_iters_per_epoch == 0 ){ + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + full_o = obj(full_E) + epochs = iter %/% num_iters_per_epoch + print("EPOCHS=" + epochs + " iter=" + iter + " OBJ (FULL DATA)=" + full_o) + } + } + + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + reordered_H = t(permut) %*% full_H2 + + W1_out = W1 + b1_out = b1 + W2_out = W2 + b2_out = b2 + W3_out = W3 + b3_out = b3 + W4_out = W4 + b4_out = b4 +} + +train_paramserv = function(Matrix[Double] X, Matrix[Double] permut, + Matrix[Double] W1, Matrix[Double] b1, Matrix[Double] W2, Matrix[Double] b2, + Matrix[Double] W3, Matrix[Double] b3, Matrix[Double] W4, Matrix[Double] b4, + Integer max_epochs, Integer batch_size, Double step, Double mu, String mode, + String utype, String freq, Integer k, String scheme, Integer nbatches, Boolean modelAvg) + return(Matrix[Double] W1_out, Matrix[Double] b1_out, Matrix[Double] W2_out, Matrix[Double] b2_out, + Matrix[Double] W3_out, Matrix[Double] b3_out, Matrix[Double] W4_out, Matrix[Double] b4_out, + Matrix[Double] reordered_H) +{ + upd_W1 = matrix(0, rows=nrow(W1), cols=ncol(W1)) + upd_b1 = matrix(0, rows=nrow(b1), cols=ncol(b1)) + upd_W2 = matrix(0, rows=nrow(W2), cols=ncol(W2)) + upd_b2 = matrix(0, rows=nrow(b2), cols=ncol(b2)) + upd_W3 = matrix(0, rows=nrow(W3), cols=ncol(W3)) + upd_b3 = matrix(0, rows=nrow(b3), cols=ncol(b3)) + upd_W4 = matrix(0, rows=nrow(W4), cols=ncol(W4)) + upd_b4 = matrix(0, rows=nrow(b4), cols=ncol(b4)) + + modelList = list(W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4) + params = list(step=step, mu=mu) + + ps_mode = mode + if(mode == "DISTRIBUTED") + ps_mode = "REMOTE_SPARK" + + if(nbatches > 0) { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", + nbatches=nbatches, modelAvg=modelAvg) + } + else { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", modelAvg=modelAvg) + } + + W1 = as.matrix(modelList2[1]) + b1 = as.matrix(modelList2[2]) + W2 = as.matrix(modelList2[3]) + b2 = as.matrix(modelList2[4]) + W3 = as.matrix(modelList2[5]) + b3 = as.matrix(modelList2[6]) + W4 = as.matrix(modelList2[7]) + b4 = as.matrix(modelList2[8]) + + [full_H1, full_H1_prime, full_H2, full_H2_prime, full_H3, full_H3_prime, full_Yhat, + full_Yhat_prime, full_E] = forward_pass(X, W1, b1, W2, b2, W3, b3, W4, b4, X) + reordered_H = t(permut) %*% full_H2 + + W1_out = W1 + b1_out = b1 + W2_out = W2 + b2_out = b2 + W3_out = W3 + b3_out = b3 + W4_out = W4 + b4_out = b4 +} + +gradients = function(list[unknown] model, + list[unknown] hyperparams, + matrix[double] features, + matrix[double] labels) + return(list[unknown] gradients) +{ + W1 = as.matrix(model[1]) + b1 = as.matrix(model[2]) + W2 = as.matrix(model[3]) + b2 = as.matrix(model[4]) + W3 = as.matrix(model[5]) + b3 = as.matrix(model[6]) + W4 = as.matrix(model[7]) + b4 = as.matrix(model[8]) + + [H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat, Yhat_prime, E] + = forward_pass(features, W1, b1, W2, b2, W3, b3, W4, b4, labels) + [W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad] + = backward_pass(features, H1, H1_prime, H2, H2_prime, H3, H3_prime, Yhat_prime, E, W1, W2, W3, W4) + + gradients = list(W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, nrow(features)) +} + +aggregation = function(list[unknown] model, + list[unknown] hyperparams, + list[unknown] gradients) + return(list[unknown] modelResult) +{ + W1 = as.matrix(model[1]) + b1 = as.matrix(model[2]) + W2 = as.matrix(model[3]) + b2 = as.matrix(model[4]) + W3 = as.matrix(model[5]) + b3 = as.matrix(model[6]) + W4 = as.matrix(model[7]) + b4 = as.matrix(model[8]) + upd_W1 = as.matrix(model[9]) + upd_b1 = as.matrix(model[10]) + upd_W2 = as.matrix(model[11]) + upd_b2 = as.matrix(model[12]) + upd_W3 = as.matrix(model[13]) + upd_b3 = as.matrix(model[14]) + upd_W4 = as.matrix(model[15]) + upd_b4 = as.matrix(model[16]) + + W1_grad = as.matrix(gradients[1]) + b1_grad = as.matrix(gradients[2]) + W2_grad = as.matrix(gradients[3]) + b2_grad = as.matrix(gradients[4]) + W3_grad = as.matrix(gradients[5]) + b3_grad = as.matrix(gradients[6]) + W4_grad = as.matrix(gradients[7]) + b4_grad = as.matrix(gradients[8]) + batch_rows = as.double(as.scalar(gradients[9])) + step = as.double(as.scalar(hyperparams["step"])) + mu = as.double(as.scalar(hyperparams["mu"])) + + [W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4] = apply_update( + W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4, + W1_grad, b1_grad, W2_grad, b2_grad, W3_grad, b3_grad, W4_grad, b4_grad, + step, mu, batch_rows) + + modelResult = list(W1, b1, W2, b2, W3, b3, W4, b4, + upd_W1, upd_b1, upd_W2, upd_b2, upd_W3, upd_b3, upd_W4, upd_b4) +} + +# ----------------------------------------------------------------------------- +# Generalized autoencoder helpers to support variable depth +# ----------------------------------------------------------------------------- +build_layer_sizes = function(Integer input_dim, list[unknown] hidden_layers) + return(list[unknown] sizes) +{ + # Build encoder/decoder layer sizes as: + # [input] + hidden_layers + reverse(hidden_layers) + [input] + sizes = list(input_dim) + for(i in 1:length(hidden_layers)) { + sizes = append(sizes, list(as.integer(as.scalar(hidden_layers[i])))) + } + if(length(hidden_layers) > 1) { + for(i in (length(hidden_layers)-1):1) { + sizes = append(sizes, list(as.integer(as.scalar(hidden_layers[i])))) + } + } + sizes = append(sizes, list(input_dim)) +} + +init_layers = function(list[unknown] sizes) + return(list[unknown] W_list, list[unknown] b_list, list[unknown] upd_W_list, list[unknown] upd_b_list) +{ + # Initialize weights with Glorot scaling and zero biases/momentum buffers. + W_list = list() + b_list = list() + upd_W_list = list() + upd_b_list = list() + for(i in 1:(length(sizes)-1)) { + in_dim = as.integer(as.scalar(sizes[i])) + out_dim = as.integer(as.scalar(sizes[i+1])) + W = Rand(rows=out_dim, cols=in_dim, min=-1, max=1, pdf="uniform") + W = sqrt(6)/sqrt(in_dim + out_dim) * W + b = matrix(0, rows=out_dim, cols=1) + W_list = append(W_list, list(W)) + b_list = append(b_list, list(b)) + upd_W_list = append(upd_W_list, list(matrix(0, rows=out_dim, cols=in_dim))) + upd_b_list = append(upd_b_list, list(matrix(0, rows=out_dim, cols=1))) + } +} + +forward_pass_layers = function(Matrix[Double] X, list[unknown] W_list, list[unknown] b_list, Matrix[Double] Y) + return(list[unknown] activations, list[unknown] primes, Matrix[Double] Yhat, Matrix[Double] E) +{ + activations = list() + primes = list() + + A_prev = X + for(i in 1:length(W_list)) { + W = as.matrix(W_list[i]) + b = as.matrix(b_list[i]) + + Z = t(W %*% t(A_prev) + b) + [A, A_prime] = func(Z) + + activations = append(activations, list(A)) + primes = append(primes, list(A_prime)) + + A_prev = A + } + + Yhat = A_prev + E = Yhat - Y +} + + +backward_pass_layers = function(Matrix[Double] X, list[unknown] activations, list[unknown] primes, + list[unknown] W_list, Matrix[Double] E) + return(list[unknown] grads_W, list[unknown] grads_b) +{ + L = length(W_list) + grads_W = list() + grads_b = list() + + delta = E * as.matrix(primes[L]) + + for(i in L:1) { + A_prev = X + if(i > 1) + A_prev = as.matrix(activations[i-1]) + + grad_W = t(delta) %*% A_prev + grad_b = t(colSums(delta)) + + grads_W = append(grads_W, list(grad_W)) + grads_b = append(grads_b, list(grad_b)) + + if(i > 1) { + W_curr = as.matrix(W_list[i]) + delta = as.matrix(primes[i-1]) * (delta %*% W_curr) + } + } +} + + +apply_update_layers = function(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] grads_W, list[unknown] grads_b, + Double step, Double mu, Double batch_rows) + return(list[unknown] W_out, list[unknown] b_out, list[unknown] upd_W_out, list[unknown] upd_b_out) +{ + # Apply SGD with momentum updates across all layers. + W_out = list() + b_out = list() + upd_W_out = list() + upd_b_out = list() + local_step = step / batch_rows + for(i in 1:length(W_list)) { + W = as.matrix(W_list[i]) + b = as.matrix(b_list[i]) + upd_W = as.matrix(upd_W_list[i]) + upd_b = as.matrix(upd_b_list[i]) + idx = length(W_list) - i + 1 + dW = as.matrix(grads_W[idx]) + db = as.matrix(grads_b[idx]) + + upd_W = mu * upd_W - local_step * dW + upd_b = mu * upd_b - local_step * db + W = W + upd_W + b = b + upd_b + + W_out = append(W_out, list(W)) + b_out = append(b_out, list(b)) + upd_W_out = append(upd_W_out, list(upd_W)) + upd_b_out = append(upd_b_out, list(upd_b)) + } +} + +train_default_layers = function(Matrix[Double] X, Matrix[Double] permut, + list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] hidden_layers, Integer max_epochs, Boolean full_obj, + Integer batch_size, Double step, Double decay, Double mu) + return(list[unknown] W_out, list[unknown] b_out, + list[unknown] upd_W_out, list[unknown] upd_b_out, + Matrix[Double] reordered_H) +{ + # Default single-process training loop for variable-depth autoencoder. + n = nrow(X) + encoder_layers = length(hidden_layers) + if( full_obj ){ + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + full_o = obj(full_E) + print("EPOCHS=" + 0 + " OBJ (FULL DATA): " + full_o) + } + + iter = 0 + num_iters_per_epoch = ceil(n / batch_size) + max_iterations = max_epochs * num_iters_per_epoch + beg = 1 + while( iter < max_iterations ){ + end = beg + batch_size - 1 + if(end > n) end = n + X_batch = X[beg:end,] + + [activations, primes, Yhat, E] = forward_pass_layers(X_batch, W_list, b_list, X_batch) + [grads_W, grads_b] = backward_pass_layers(X_batch, activations, primes, W_list, E) + + o = obj(E) + print("epochs=" + (iter / num_iters_per_epoch) + " BATCH beg=" + beg + " end=" + end + " obj=" + o) + + [W_list, b_list, upd_W_list, upd_b_list] = apply_update_layers( + W_list, b_list, upd_W_list, upd_b_list, grads_W, grads_b, step, mu, nrow(X_batch)) + + iter = iter + 1 + if(end == n) beg = 1 + else beg = end + 1 + + if( iter %% num_iters_per_epoch == 0 ) step = step * decay + + if( full_obj & iter %% num_iters_per_epoch == 0 ){ + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + full_o = obj(full_E) + epochs = iter %/% num_iters_per_epoch + print("EPOCHS=" + epochs + " iter=" + iter + " OBJ (FULL DATA)=" + full_o) + } + } + + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + # Return the encoder bottleneck representation. + hidden = as.matrix(full_act[encoder_layers]) + reordered_H = t(permut) %*% hidden + + W_out = W_list + b_out = b_list + upd_W_out = upd_W_list + upd_b_out = upd_b_list +} + +flatten_model = function(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list) + return(list[unknown] modelList) +{ + modelList = list() + + for(i in 1:length(W_list)) + modelList = append(modelList, list(as.matrix(W_list[i]))) + + for(i in 1:length(b_list)) + modelList = append(modelList, list(as.matrix(b_list[i]))) + + for(i in 1:length(upd_W_list)) + modelList = append(modelList, list(as.matrix(upd_W_list[i]))) + + for(i in 1:length(upd_b_list)) + modelList = append(modelList, list(as.matrix(upd_b_list[i]))) +} + +unflatten_model = function(list[unknown] modelList, Integer layer_count) + return(list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list) +{ + W_list = list() + b_list = list() + upd_W_list = list() + upd_b_list = list() + + for(i in 1:layer_count) + W_list = append(W_list, list(as.matrix(modelList[i]))) + + for(i in 1:layer_count) + b_list = append(b_list, list(as.matrix(modelList[layer_count + i]))) + + for(i in 1:layer_count) + upd_W_list = append(upd_W_list, list(as.matrix(modelList[2*layer_count + i]))) + + for(i in 1:layer_count) + upd_b_list = append(upd_b_list, list(as.matrix(modelList[3*layer_count + i]))) +} + +train_paramserv_layers = function(Matrix[Double] X, Matrix[Double] permut, + list[unknown] W_list, list[unknown] b_list, + list[unknown] upd_W_list, list[unknown] upd_b_list, + list[unknown] hidden_layers, Integer layer_count, Integer max_epochs, + Integer batch_size, Double step, Double mu, String mode, + String utype, String freq, Integer k, String scheme, Integer nbatches, Boolean modelAvg) + return(list[unknown] W_out, list[unknown] b_out, + list[unknown] upd_W_out, list[unknown] upd_b_out, + Matrix[Double] reordered_H) +{ + modelList = list() + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(W_list[i]))) + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(b_list[i]))) + + if(length(upd_W_list) == 0) { + for(i in 1:layer_count) + upd_W_list = append(upd_W_list, + list(matrix(0, rows=nrow(as.matrix(W_list[i])), cols=ncol(as.matrix(W_list[i]))))) + } + if(length(upd_b_list) == 0) { + for(i in 1:layer_count) + upd_b_list = append(upd_b_list, + list(matrix(0, rows=nrow(as.matrix(b_list[i])), cols=ncol(as.matrix(b_list[i]))))) + } + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(upd_W_list[i]))) + + for(i in 1:layer_count) + modelList = append(modelList, list(as.matrix(upd_b_list[i]))) + + params = list(step=step, mu=mu, layer_count=layer_count, batch_size=batch_size) + encoder_layers = length(hidden_layers) + + ps_mode = mode + if(mode == "DISTRIBUTED") + ps_mode = "REMOTE_SPARK" + + if(nbatches > 0) { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients_layers", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation_layers", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", + nbatches=nbatches, modelAvg=modelAvg) + } + else { + modelList2 = paramserv(model=modelList, features=X, labels=X, + upd="./scripts/builtin/autoencoder_2layer.dml::gradients_layers", + agg="./scripts/builtin/autoencoder_2layer.dml::aggregation_layers", + mode=ps_mode, utype=utype, freq=freq, epochs=max_epochs, batchsize=batch_size, + k=k, scheme=scheme, hyperparams=params, checkpointing="NONE", modelAvg=modelAvg) + } + + [W_list, b_list, upd_W_list, upd_b_list] = unflatten_model(modelList2, layer_count) + [full_act, full_primes, full_Yhat, full_E] = forward_pass_layers(X, W_list, b_list, X) + hidden = as.matrix(full_act[encoder_layers]) + reordered_H = t(permut) %*% hidden + + W_out = W_list + b_out = b_list + upd_W_out = upd_W_list + upd_b_out = upd_b_list +} + + +gradients_layers = function(list[unknown] model, + list[unknown] hyperparams, + matrix[double] features, + matrix[double] labels) + return(list[unknown] gradients) +{ + layer_count = as.integer(as.scalar(hyperparams["layer_count"])) + + W_list = list() + b_list = list() + + for(i in 1:layer_count) + W_list = append(W_list, list(as.matrix(model[i]))) + + for(i in 1:layer_count) + b_list = append(b_list, list(as.matrix(model[layer_count + i]))) + + [activations, primes, Yhat, E] = forward_pass_layers(features, W_list, b_list, labels) + [grads_W, grads_b] = backward_pass_layers(features, activations, primes, W_list, E) + + gradients = list() + for(i in 1:layer_count) + gradients = append(gradients, list(as.matrix(grads_W[layer_count - i + 1]))) + for(i in 1:layer_count) + gradients = append(gradients, list(as.matrix(grads_b[layer_count - i + 1]))) +} + + +aggregation_layers = function(list[unknown] model, + list[unknown] hyperparams, + list[unknown] gradients) + return(list[unknown] modelResult) +{ + layer_count = as.integer(as.scalar(hyperparams["layer_count"])) + step = as.double(as.scalar(hyperparams["step"])) + mu = as.double(as.scalar(hyperparams["mu"])) + batch_rows = as.double(as.scalar(hyperparams["batch_size"])) + local_step = step / batch_rows + + W_out = list() + b_out = list() + upd_W_out = list() + upd_b_out = list() + + for(i in 1:layer_count) { + W = as.matrix(model[i]) + b = as.matrix(model[layer_count + i]) + upd_W = as.matrix(model[2*layer_count + i]) + upd_b = as.matrix(model[3*layer_count + i]) + + W_grad = as.matrix(gradients[i]) + b_grad = as.matrix(gradients[layer_count + i]) + + upd_W = mu * upd_W - local_step * W_grad + upd_b = mu * upd_b - local_step * b_grad + + W = W + upd_W + b = b + upd_b + + W_out = append(W_out, list(W)) + b_out = append(b_out, list(b)) + upd_W_out = append(upd_W_out, list(upd_W)) + upd_b_out = append(upd_b_out, list(upd_b)) + } + + modelResult = list() + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(W_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(b_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(upd_W_out[i]))) + for(i in 1:layer_count) + modelResult = append(modelResult, list(as.matrix(upd_b_out[i]))) +} +