Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 394 additions & 0 deletions examples/observability_gateway.wave
Original file line number Diff line number Diff line change
@@ -0,0 +1,394 @@
import("std::env::cwd");
import("std::env::environ");
import("std::path::copy");
import("std::string::len");
import("std::string::trim");
import("std::string::ascii");
import("std::string::hash");
import("std::math::int");
import("std::math::bits");
import("std::math::float");
import("std::math::num");
import("std::time::clock");
import("std::time::diff");
import("std::time::sleep");
import("std::buffer::alloc");
import("std::buffer::write");
import("std::buffer::read");
import("std::mem::ops");
import("std::net::tcp");

const DEFAULT_PORT: i32 = 18080;
const ALERT_THRESHOLD: i32 = 3;
const MAX_ROUTE_SCORE: i32 = 100;

static GLOBAL_EVENT_SEQ: i64 = 0;

type TenantId = i64;

enum Severity -> i32 {
INFO = 1,
WARN = 2,
ERROR = 3
}

#[target(os="linux")]
fun default_data_root() -> str {
return "/var/lib/wave";
}

#[target(os="macos")]
fun default_data_root() -> str {
return "/usr/local/var/wave";
}

struct KeyValue<K, V> {
key: K;
value: V;
}

struct Window<T> {
at_ns: i64;
data: T;
}

struct ApiResult<T> {
ok: bool;
data: T;
}

struct RequestCtx {
tenant: TenantId;
method: str;
route: str;
}

struct Metrics {
total: i32;
info: i32;
warn: i32;
error: i32;
}

struct IngestRecord {
seq: i64;
ctx: RequestCtx;
severity: Severity;
message: str;
took_ns: i64;
route_score: i32;
route_hash: i64;
}

proto Metrics {
fun record(self: Metrics, sev: Severity) -> Metrics {
var next: Metrics = self;
next.total += 1;

if (sev == INFO) {
next.info += 1;
} else if (sev == WARN) {
next.warn += 1;
} else {
next.error += 1;
}

return next;
}

fun is_alert(self: Metrics, threshold: i32) -> bool {
if (self.error >= threshold) {
return true;
}
return false;
}
}

proto RequestCtx {
fun is_api(self: RequestCtx) -> bool {
return is_api_route(self.route);
}
}

fun identity<T>(x: T) -> T {
return x;
}

fun choose<T>(left: T, right: T, pick_left: bool) -> T {
if (pick_left) {
return left;
}
return right;
}

fun make_kv<K, V>(k: K, v: V) -> KeyValue<K, V> {
var pair_value: KeyValue<K, V>;
pair_value.key = k;
pair_value.value = v;
return pair_value;
}

fun wrap_window<T>(data: T, at_ns: i64) -> Window<T> {
var w: Window<T>;
w.at_ns = at_ns;
w.data = data;
return w;
}

fun ok<T>(payload: T) -> ApiResult<T> {
var r: ApiResult<T>;
r.ok = true;
r.data = payload;
return r;
}

fun is_api_route(route: str) -> bool {
var n: i32 = len(route);
if (n < 5) {
return false;
}

if (route[0] != 47) {
return false;
}

if (to_lower(route[1]) != 97) {
return false;
}

if (to_lower(route[2]) != 112) {
return false;
}

if (to_lower(route[3]) != 105) {
return false;
}

if (route[4] != 47) {
return false;
}

return true;
}

fun parse_severity(line: str) -> Severity {
var i: i32 = trim_left_index(line);
var c: u8 = to_upper(line[i]);

if (c == 69) {
return ERROR;
}

if (c == 87) {
return WARN;
}

return INFO;
}

fun score_route(route: str) -> i32 {
var n: i32 = len(route);
var score: i32 = clamp(n * 2, 0, MAX_ROUTE_SCORE);

if (path_has_ext(route)) {
score += 12;
}

if (!path_is_abs(route)) {
score += 4;
}

var ext_start: i32 = path_ext_start(route);
if (ext_start >= 0) {
var tail: i32 = n - ext_start;
score += min(tail * 3, 20);
}

score += popcount(n);
score = align_up(score, 4);
score = clamp(score, 0, MAX_ROUTE_SCORE);

return score;
}

fun build_log_frame(ctx: RequestCtx, sev: Severity) -> i64 {
var frame: Buffer = buffer_new_default();

var tenant_kv: KeyValue<str, TenantId> = make_kv<str, TenantId>("tenant", ctx.tenant);

buffer_append_str(&frame, tenant_kv.key);
buffer_push(&frame, 61);

if (sev == ERROR) {
buffer_append_str(&frame, "ERROR");
} else if (sev == WARN) {
buffer_append_str(&frame, "WARN");
} else {
buffer_append_str(&frame, "INFO");
}

buffer_push(&frame, 124);
buffer_append_str(&frame, ctx.route);

var first_byte: u8 = buffer_at(frame, 0);

var a: array<u8, 4>;
var b: array<u8, 4>;

mem_set(&a[0], first_byte, 4);
mem_copy(&b[0], &a[0], 4);

var ok_mem: bool = (mem_cmp(&a[0], &b[0], 4) == 0);
var used: i64 = frame.len;

if (!ok_mem) {
used = -1;
}

buffer_free(&frame);
return used;
}

fun ingest_one(
ctx: RequestCtx,
line: str,
base_metrics: Metrics
) -> ApiResult<Window<IngestRecord>> {
var start_ts: TimeSpec;
time_now_monotonic(&start_ts);

GLOBAL_EVENT_SEQ += 1;

var sev: Severity = parse_severity(line);
var next_metrics: Metrics = base_metrics.record(sev);

var end_ts: TimeSpec;
time_now_monotonic(&end_ts);

var took_ns: i64 = time_diff_ns(start_ts, end_ts);

var rec: IngestRecord;
rec.seq = GLOBAL_EVENT_SEQ;
rec.ctx = ctx;
rec.severity = sev;
rec.message = line;
rec.took_ns = took_ns;
rec.route_score = score_route(ctx.route);
rec.route_hash = fnv1a_64(ctx.route);

var wrapped: Window<IngestRecord> = wrap_window<IngestRecord>(
rec,
time_now_monotonic_ns()
);

if (next_metrics.is_alert(ALERT_THRESHOLD)) {
println("ALERT tenant={} errors={}", ctx.tenant, next_metrics.error);
}

return ok<Window<IngestRecord>>(wrapped);
}

fun bootstrap_runtime() {
var cwd_buf: array<u8, 512>;
var cwd_len: i64 = env_getcwd(&cwd_buf[0], 512);

var env_port_raw: array<u8, 64>;
var env_port_len: i64 = env_get("WAVE_PORT", &env_port_raw[0], 64);

var configured_port: i32 = env_get_i32_default("WAVE_PORT", DEFAULT_PORT);
var port: i32 = clamp(configured_port, 1024, 65535);

var listener: TcpListener = tcp_bind(port as i16);
tcp_close_listener(listener);

var root: str = default_data_root();

var log_path_buf: array<u8, 256>;
var base_buf: array<u8, 128>;
var dir_buf: array<u8, 128>;

var log_path_len: i32 = path_join2(&log_path_buf[0], 256, root, "spool/events.log");
var base_len: i32 = path_basename_copy(&base_buf[0], 128, root);
var dir_len: i32 = path_dirname_copy(&dir_buf[0], 128, root);

println("platform root: {}", root);
println("cwd length: {}", cwd_len);
println("env WAVE_PORT len: {}", env_port_len);
println("log path bytes: {}", log_path_len);
println("base len: {} dirname len: {}", base_len, dir_len);
println("has HOME? {}", env_exists("HOME"));
}

fun main() {
bootstrap_runtime();

var metrics: Metrics = Metrics {
total: 0,
info: 0,
warn: 0,
error: 0
};

var ctx: RequestCtx = RequestCtx {
tenant: 42,
method: "POST",
route: "/api/v1/orders/create.csv"
};

if (ctx.is_api()) {
println("api route accepted: {}", ctx.route);
}

var r1: ApiResult<Window<IngestRecord>> = ingest_one(
ctx,
"WARN order latency reached 240ms",
metrics
);

metrics = metrics.record(r1.data.data.severity);

var r2: ApiResult<Window<IngestRecord>> = ingest_one(
ctx,
"ERROR payment provider timeout",
metrics
);

metrics = metrics.record(r2.data.data.severity);

var normalized_score: i32 = identity<i32>(r2.data.data.route_score);
var stronger_score: i32 = choose<i32>(normalized_score, 64, normalized_score > 64);

var jitter_raw: i32 = abs((r2.data.data.took_ns % 97) as i32);
var jitter_aligned: i32 = align_down(jitter_raw + 7, 4);
var jitter_factor: f32 = clamp_f32((jitter_aligned as f32) / 10.0, 0.1, 10.0);

var frame_size: i64 = build_log_frame(ctx, r2.data.data.severity);
var shard: i32 = ilog2_ceil(max(1, stronger_score));
var bucket: i32 = gcd((r2.data.data.route_hash % 1000) as i32, 360);

println(
"seq={} score={} frame={} took={}ns",
r2.data.data.seq,
stronger_score,
frame_size,
r2.data.data.took_ns
);

println(
"hash={} shard={} bucket={} jitter={}",
r2.data.data.route_hash,
shard,
bucket,
jitter_factor
);

println(
"metrics total={} info={} warn={} error={}",
metrics.total,
metrics.info,
metrics.warn,
metrics.error
);

time_sleep_ms(1);
}
Loading