Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "freenet-stdlib"
version = "0.1.40"
version = "0.2.0"
edition = "2021"
rust-version = "1.80"
publish = true
Expand All @@ -11,6 +11,7 @@ repository = "https://github.com/freenet/freenet-stdlib"
[dependencies]
arbitrary = { version = "1", optional = true, features = ["derive"] }
bincode = "1"
bytes = { version = "1", features = ["serde"] }
byteorder = "1"
blake3 = { version = "1", features = ["std", "traits-preview"] }
bs58 = "0.5"
Expand Down
3 changes: 3 additions & 0 deletions rust/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ mod browser;
#[cfg(all(target_family = "wasm", feature = "net"))]
pub use browser::*;

#[cfg(feature = "net")]
pub mod streaming;

pub use client_events::*;

#[cfg(feature = "net")]
Expand Down
126 changes: 102 additions & 24 deletions rust/src/client_api/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Connection = web_sys::WebSocket;
pub struct WebApi {
conn: Connection,
error_handler: Box<dyn FnMut(Error) + 'static>,
next_stream_id: u32,
}

impl Drop for WebApi {
Expand All @@ -32,6 +33,8 @@ impl WebApi {
{
let eh = Rc::new(RefCell::new(error_handler.clone()));
let result_handler = Rc::new(RefCell::new(result_handler));
let reassembly = Rc::new(RefCell::new(super::streaming::ReassemblyBuffer::new()));

let onmessage_callback = Closure::<dyn FnMut(_)>::new(move |e: MessageEvent| {
// Extract the Blob from the MessageEvent
let value: JsValue = e.data();
Expand All @@ -44,6 +47,7 @@ impl WebApi {
let fr_clone = file_reader.clone();
let eh_clone = eh.clone();
let result_handler_clone = result_handler.clone();
let reassembly_clone = reassembly.clone();

let onloadend_callback = Closure::<dyn FnMut()>::new(move || {
let array_buffer = fr_clone
Expand All @@ -52,6 +56,9 @@ impl WebApi {
.dyn_into::<js_sys::ArrayBuffer>()
.unwrap();
let bytes = js_sys::Uint8Array::new(&array_buffer).to_vec();

use super::client_events::HostResponse;

let response: HostResult = match bincode::deserialize(&bytes) {
Ok(val) => val,
Err(err) => {
Expand All @@ -62,7 +69,54 @@ impl WebApi {
return;
}
};
result_handler_clone.borrow_mut()(response);

match response {
Ok(HostResponse::StreamHeader { .. }) => {
// StreamHeader is metadata only — the following StreamChunks
// will be reassembled transparently by the ReassemblyBuffer.
// Browser incremental streaming is not yet supported.
return;
}
Ok(HostResponse::StreamChunk {
stream_id,
index,
total,
data,
}) => {
match reassembly_clone
.borrow_mut()
.receive_chunk(stream_id, index, total, data)
{
Ok(Some(complete)) => {
let inner: HostResult = match bincode::deserialize(&complete) {
Ok(val) => val,
Err(err) => {
eh_clone.borrow_mut()(Error::ConnectionError(
serde_json::json!({
"error": format!("{err}"),
"source": "stream reassembly deserialization"
}),
));
return;
}
};
result_handler_clone.borrow_mut()(inner);
}
Ok(None) => return, // more chunks needed
Err(e) => {
reassembly_clone.borrow_mut().remove_stream(stream_id);
eh_clone.borrow_mut()(Error::ConnectionError(serde_json::json!({
"error": format!("{e}"),
"source": "streaming reassembly"
})));
return;
}
}
}
other => {
result_handler_clone.borrow_mut()(other);
}
}
});

// Set the FileReader handlers
Expand Down Expand Up @@ -94,7 +148,6 @@ impl WebApi {
handler();
}
}) as Box<dyn FnMut()>);
// conn.add_event_listener_with_callback("open", onopen_callback.as_ref().unchecked_ref());
conn.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();

Expand All @@ -112,12 +165,15 @@ impl WebApi {
WebApi {
conn,
error_handler: Box::new(error_handler),
next_stream_id: 0,
}
}

pub async fn send(&mut self, request: ClientRequest<'static>) -> Result<(), Error> {
// Check WebSocket ready state before sending
// Per WebSocket spec, send() silently discards data when socket is CLOSING/CLOSED
use super::streaming::{chunk_request, CHUNK_THRESHOLD};

// Check WebSocket ready state before sending.
// Per WebSocket spec, send() silently discards data when socket is CLOSING/CLOSED.
let ready_state = self.conn.ready_state();
if ready_state != web_sys::WebSocket::OPEN {
let state_name = match ready_state {
Expand All @@ -137,29 +193,51 @@ impl WebApi {
}

let send = bincode::serialize(&request)?;
self.conn.send_with_u8_array(&send).map_err(|err| {
let err: serde_json::Value = match serde_wasm_bindgen::from_value(err) {
Ok(e) => e,
Err(e) => {
let e = serde_json::json!({
"error": format!("{e}"),
"origin": "request serialization",
"request": format!("{request:?}"),
});
(self.error_handler)(Error::ConnectionError(e.clone()));
return Error::ConnectionError(e);
}
};
(self.error_handler)(Error::ConnectionError(serde_json::json!({
"error": err,
"origin": "request sending",
"request": format!("{request:?}"),
})));
Error::ConnectionError(err)
})?;

if send.len() > CHUNK_THRESHOLD {
let stream_id = self.next_stream_id;
self.next_stream_id = self.next_stream_id.wrapping_add(1);
let chunks = chunk_request(send, stream_id);
for chunk in &chunks {
let chunk_bytes =
bincode::serialize(chunk).map_err(|e| Error::OtherError(e.into()))?;
self.conn
.send_with_u8_array(&chunk_bytes)
.map_err(|err| Self::map_send_error(err, &request, &mut self.error_handler))?;
}
} else {
self.conn
.send_with_u8_array(&send)
.map_err(|err| Self::map_send_error(err, &request, &mut self.error_handler))?;
}
Ok(())
}

fn map_send_error(
err: JsValue,
request: &ClientRequest<'_>,
error_handler: &mut Box<dyn FnMut(Error) + 'static>,
) -> Error {
let err: serde_json::Value = match serde_wasm_bindgen::from_value(err) {
Ok(e) => e,
Err(e) => {
let e = serde_json::json!({
"error": format!("{e}"),
"origin": "request serialization",
"request": format!("{request:?}"),
});
error_handler(Error::ConnectionError(e.clone()));
return Error::ConnectionError(e);
}
};
error_handler(Error::ConnectionError(serde_json::json!({
"error": err,
"origin": "request sending",
"request": format!("{request:?}"),
})));
Error::ConnectionError(err)
}

pub fn disconnect(self, cause: impl AsRef<str>) {
let _ = self.conn.close_with_code_and_reason(1000, cause.as_ref());
}
Expand Down
113 changes: 112 additions & 1 deletion rust/src/client_api/client_events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use flatbuffers::WIPOffset;
use std::borrow::Cow;
use std::fmt::Display;
Expand Down Expand Up @@ -32,6 +33,7 @@ use crate::generated::host_response::{
NotFoundArgs, Ok as FbsOk, OkArgs, OutboundDelegateMsg as FbsOutboundDelegateMsg,
OutboundDelegateMsgArgs, OutboundDelegateMsgType, PutResponse as FbsPutResponse,
PutResponseArgs, RequestUserInput as FbsRequestUserInput, RequestUserInputArgs,
StreamChunk as FbsHostStreamChunk, StreamChunkArgs as FbsHostStreamChunkArgs,
UpdateNotification as FbsUpdateNotification, UpdateNotificationArgs,
UpdateResponse as FbsUpdateResponse, UpdateResponseArgs,
};
Expand Down Expand Up @@ -257,6 +259,13 @@ pub enum ClientRequest<'a> {
NodeQueries(NodeQuery),
/// Gracefully disconnect from the host.
Close,
/// A chunk of a larger streamed message.
StreamChunk {
stream_id: u32,
index: u32,
total: u32,
data: Bytes,
},
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -319,6 +328,17 @@ impl ClientRequest<'_> {
ClientRequest::Authenticate { token } => ClientRequest::Authenticate { token },
ClientRequest::NodeQueries(query) => ClientRequest::NodeQueries(query),
ClientRequest::Close => ClientRequest::Close,
ClientRequest::StreamChunk {
stream_id,
index,
total,
data,
} => ClientRequest::StreamChunk {
stream_id,
index,
total,
data,
},
}
}

Expand Down Expand Up @@ -355,7 +375,20 @@ impl ClientRequest<'_> {
token: token.to_owned(),
}
}
_ => unreachable!(),
ClientRequestType::StreamChunk => {
let chunk = client_request.client_request_as_stream_chunk().unwrap();
ClientRequest::StreamChunk {
stream_id: chunk.stream_id(),
index: chunk.index(),
total: chunk.total(),
data: Bytes::from(chunk.data().bytes().to_vec()),
}
}
_ => {
return Err(WsApiError::deserialization(
"unknown client request type".to_string(),
))
}
},
Err(e) => {
let cause = format!("{e}");
Expand Down Expand Up @@ -641,6 +674,12 @@ impl Display for ClientRequest<'_> {
ClientRequest::Authenticate { .. } => write!(f, "authenticate"),
ClientRequest::NodeQueries(query) => write!(f, "node queries: {:?}", query),
ClientRequest::Close => write!(f, "close"),
ClientRequest::StreamChunk {
stream_id,
index,
total,
..
} => write!(f, "stream chunk {index}/{total} (stream {stream_id})"),
}
}
}
Expand Down Expand Up @@ -704,6 +743,33 @@ pub enum HostResponse<T = WrappedState> {
QueryResponse(QueryResponse),
/// A requested action which doesn't require an answer was performed successfully.
Ok,
/// A chunk of a larger streamed response.
StreamChunk {
stream_id: u32,
index: u32,
total: u32,
data: Bytes,
},
/// Header message announcing the start of a streamed response.
/// Sent before the corresponding [`StreamChunk`] messages so the client
/// can set up incremental consumption via [`WsStreamHandle`].
StreamHeader {
stream_id: u32,
total_bytes: u64,
content: StreamContent,
},
}

/// Describes what kind of response is being streamed.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum StreamContent {
/// A streamed GetResponse — the large state is delivered via StreamChunks.
GetResponse {
key: ContractKey,
includes_contract: bool,
},
/// Raw binary stream (future use).
Raw,
}

type Peer = String;
Expand Down Expand Up @@ -1513,6 +1579,40 @@ impl HostResponse {
Ok(builder.finished_data().to_vec())
}
HostResponse::QueryResponse(_) => unimplemented!(),
HostResponse::StreamChunk {
stream_id,
index,
total,
data,
} => {
let data_offset = builder.create_vector(&data);
let chunk_offset = FbsHostStreamChunk::create(
&mut builder,
&FbsHostStreamChunkArgs {
stream_id,
index,
total,
data: Some(data_offset),
},
);
let host_response_offset = FbsHostResponse::create(
&mut builder,
&HostResponseArgs {
response_type: HostResponseType::StreamChunk,
response: Some(chunk_offset.as_union_value()),
},
);
finish_host_response_buffer(&mut builder, host_response_offset);
Ok(builder.finished_data().to_vec())
}
HostResponse::StreamHeader { .. } => {
// StreamHeader is only sent over bincode (Native encoding) to
// streaming-capable clients. Flatbuffers clients use transparent
// reassembly via StreamChunk only.
Err(Box::new(ClientError::from(ErrorKind::Unhandled {
cause: "StreamHeader is not supported over flatbuffers encoding".into(),
})))
}
}
}
}
Expand Down Expand Up @@ -1543,6 +1643,17 @@ impl Display for HostResponse {
HostResponse::DelegateResponse { .. } => write!(f, "delegate responses"),
HostResponse::Ok => write!(f, "ok response"),
HostResponse::QueryResponse(_) => write!(f, "query response"),
HostResponse::StreamChunk {
stream_id,
index,
total,
..
} => write!(f, "stream chunk {index}/{total} (stream {stream_id})"),
HostResponse::StreamHeader {
stream_id,
total_bytes,
..
} => write!(f, "stream header (stream {stream_id}, {total_bytes} bytes)"),
}
}
}
Expand Down
Loading