Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Breaking changes in this release:
- Added pull-based capabilities system for dynamically discovering adapter capabilities at runtime, in PR [#5679](https://github.com/microsoft/BotFramework-WebChat/pull/5679), by [@pranavjoshi001](https://github.com/pranavjoshi001)
- Added Speech-to-Speech (S2S) support for real-time voice conversations, in PR [#5654](https://github.com/microsoft/BotFramework-WebChat/pull/5654), by [@pranavjoshi](https://github.com/pranavjoshi001)
- Added core mute/unmute functionality for speech-to-speech via `useRecorder` hook (silent chunks keep server connection alive), in PR [#5688](https://github.com/microsoft/BotFramework-WebChat/pull/5688), by [@pranavjoshi](https://github.com/pranavjoshi001)
- 🧪 Added incremental streaming Markdown renderer for livestreaming, in PR [#5799](https://github.com/microsoft/BotFramework-WebChat/pull/5799), by [@OEvgeny](https://github.com/OEvgeny)

### Changed

Expand Down
236 changes: 236 additions & 0 deletions __tests__/assets/custom-element/event-stream-adapter.ce.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Event Stream Adapter Custom Element</title>
</head>
<body>
<script type="module">
import { customElement } from '/assets/custom-element/custom-element.js';
import { getActivityLivestreamingMetadata } from 'botframework-webchat-core';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import createStreamCoalescer from '/assets/esm/adapter/createStreamCoalescer.js';
import { forIterator } from '/assets/esm/adapter/demuxChainOfThought.js';

function createObservable(subscribe) {
return Object.freeze({ subscribe });
}

customElement('event-stream-adapter', currentDocument =>
class EventStreamAdapterElement extends HTMLElement {
static get observedAttributes() { return ['compat', 'href']; }

#abortController = null;
#activities = [];
#activityBuffer = [];
#activityObservers = [];
#activityWaiters = new Set();
#directLine;

constructor() {
super();

const self = this;

this.#directLine = Object.freeze({
activity$: createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

for (const activity of self.#activityBuffer) {
observer.next(activity);
}

self.#activityObservers.push(observer);

return Object.freeze({
unsubscribe() {
const i = self.#activityObservers.indexOf(observer);
i !== -1 && self.#activityObservers.splice(i, 1);
}
});
}),

connectionStatus$: createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

observer.next(0);
observer.next(1);
observer.next(2);

return Object.freeze({ unsubscribe() {} });
}),

end() {},

postActivity() {
return createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

observer.next(crypto.randomUUID());
observer.complete?.();

return Object.freeze({ unsubscribe() {} });
});
}
});
}

get compat() {
return this.getAttribute('compat') || 'webchat';
}

get directLine() {
return this.#directLine;
}

connectedCallback() {
this.#load();
}

disconnectedCallback() {
this.#abortController?.abort();
this.#abortController = null;
}

#emitToAdapter(activity) {
this.#activityBuffer.push(activity);

for (const observer of this.#activityObservers) {
observer.next(activity);
}
}

#emitEvent(activity) {
this.#activities.push(activity);

for (const resolve of this.#activityWaiters) {
resolve();
}

this.#activityWaiters.clear();

const meta = getActivityLivestreamingMetadata(activity);
const element = this;
const detail = { activity };

meta && Object.defineProperty(detail, 'activities', {
enumerable: true,
get: () => element.#iterateStreamActivities(meta.sessionId)
});

this.dispatchEvent(new CustomEvent('activity', {
bubbles: true,
detail: Object.freeze(detail)
}));
}

async *#iterateStreamActivities(sessionId) {
let cursor = 0;

for (;;) {
while (cursor < this.#activities.length) {
const activity = this.#activities[cursor++];
const activityMeta = getActivityLivestreamingMetadata(activity);

if (!activityMeta || activityMeta.sessionId !== sessionId) {
continue;
}

yield activity;

if (activityMeta.type === 'final activity') {
return;
}
}

await new Promise(resolve => this.#activityWaiters.add(resolve));
}
}

async #load() {
const href = this.getAttribute('href');

if (!href) {
return;
}

this.#abortController?.abort();
this.#abortController = new AbortController();

const { signal } = this.#abortController;
const compat = this.compat;
const useAdapter = compat === 'webchat' || compat === 'both';
const useEvents = compat === 'events' || compat === 'both';

try {
for await (const activity of forIterator({}, this.#fetchStreamed(href, signal))) {
if (signal.aborted) {
break;
}

useAdapter && this.#emitToAdapter(activity);
useEvents && this.#emitEvent(activity.raw);
}
} catch (error) {
signal.aborted || console.error('event-stream-adapter:', error);
}
}

async *#fetchStreamed(href, signal) {
const typingMap = new Map();
const res = await fetch(new URL(href, location.href), { signal });

yield* res.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform({ data, event }, controller) {
if (event === 'end') {
controller.terminate();
return;
}

if (event !== 'activity') {
return;
}

const activity = JSON.parse(data);

activity.raw = { ...activity };

if (
activity.type === 'typing' &&
activity.text &&
activity.channelData?.streamType === 'streaming'
) {
const streamId = activity.channelData?.streamId || activity.id;
let accumulated = typingMap.get(streamId) || '';

if (activity.channelData?.chunkType === 'delta') {
accumulated += activity.text;
activity.text = accumulated;
} else {
accumulated = activity.text;
}

typingMap.set(streamId, accumulated);
}

controller.enqueue(activity);
}
})
)
.pipeThrough(createStreamCoalescer());
}
}
);
</script>
</body>
</html>
54 changes: 54 additions & 0 deletions __tests__/assets/esm/adapter/LivestreamSession.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* eslint-disable security/detect-object-injection */
/*!
* Copyright (C) Microsoft Corporation. All rights reserved.
*/

const NextLivestreamSequence = Symbol();
const PreviousActivitySymbol = Symbol();
const SessionIdSymbol = Symbol();

export default class LivestreamSession {
constructor(sessionId) {
this[NextLivestreamSequence] = 1;
this[PreviousActivitySymbol] = undefined;
this[SessionIdSymbol] = sessionId;
}

/**
* Last string, useful for decompressing delta-compressed chunks.
*/
get previousActivity() {
return this[PreviousActivitySymbol];
}

set previousActivity(value) {
this[PreviousActivitySymbol] = value;
}

/**
* Activity ID of the session (and the first activity.)
*
* @type {string}
*/
get sessionId() {
return this[SessionIdSymbol];
}

get isConcluded() {
return this[NextLivestreamSequence] === Infinity;
}

/** @return {number} */
getNextLivestreamSequence(
/** @type {boolean | undefined} */
isFinal = false
) {
if (isFinal) {
this.previousActivity = undefined;

return (this[NextLivestreamSequence] = Infinity);
}

return this[NextLivestreamSequence]++;
}
}
80 changes: 80 additions & 0 deletions __tests__/assets/esm/adapter/LivestreamSessionManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* eslint-disable security/detect-object-injection */

/*!
* Copyright (C) Microsoft Corporation. All rights reserved.
*/

import LivestreamSession from './LivestreamSession.js';

const ActiveLivestreamSymbol = Symbol();

export default class LivestreamSessionManager {
constructor() {
this[ActiveLivestreamSymbol] = new Map();
}

*concludeAll() {
for (const [sessionId, session] of this[ActiveLivestreamSymbol]) {
if (!session.isConcluded) {
const { previousActivity } = session;
const entitiesWithoutStreamInfo = (previousActivity?.entities ?? []).filter(
({ type }) => type !== 'streaminfo'
);

yield Object.freeze({
...previousActivity,
channelData: Object.freeze({
...previousActivity?.channelData,
chunkType: undefined,
streamId: sessionId,
streamSequence: undefined,
streamType: 'final'
}),
entities: Object.freeze([...entitiesWithoutStreamInfo]),
id: `${sessionId}/final`,
text: previousActivity?.text,
type: 'message'
});
}
}
}

has(livestreamSessionId) {
return this[ActiveLivestreamSymbol].has(livestreamSessionId);
}

*sequence(livestreamSessionId, activity, isFinal = false) {
let livestreamSession = this[ActiveLivestreamSymbol].get(livestreamSessionId);

if (!livestreamSession) {
livestreamSession = new LivestreamSession(livestreamSessionId);

this[ActiveLivestreamSymbol].set(livestreamSessionId, livestreamSession);
}

if (livestreamSession.isConcluded) {
return;
}

const streamSequence = livestreamSession.getNextLivestreamSequence(isFinal);
const entitiesWithoutStreamInfo = (activity.entities ?? []).filter(({ type }) => type !== 'streaminfo');

// We assume the chat adapter will do delta decompression.
livestreamSession.previousActivity = activity;

yield Object.freeze({
...activity,
channelData: Object.freeze({
...activity.channelData,
chunkType: undefined,
streamId: streamSequence === 1 ? undefined : livestreamSessionId,
streamSequence: streamSequence === Infinity ? undefined : streamSequence,
streamType: streamSequence === Infinity ? 'final' : 'streaming'
}),
entities: Object.freeze([...entitiesWithoutStreamInfo]),
id: streamSequence === 1 ? livestreamSessionId : activity.id,
text: livestreamSession.previousActivity.text,
type: streamSequence === Infinity ? 'message' : 'typing'
});
}
}
Loading
Loading