Skip to main content

Streaming messages

This guide covers how to receive and fetch streaming messages in direct channels and group channels using the Nexconn SDK.

Overview

Streaming messages are triggered by your business server, generated by the IM server, and delivered to the client SDK.

The SDK defines a StreamMessage content class extending MessageContent:

PropertyTypeDescription
contentStringAggregated stream text content.
typeStringThe text format of the streaming message.
isCompleteBooleanWhether the stream has finished generating content.
completeReasonIntBusiness-side completion reason. 0 indicates normal.
stopReasonIntIM-side completion reason. 0 indicates normal.
syncBooleanWhether the client has finished fetching the stream.
tip
  • Streaming messages cannot be sent from the SDK. They are delivered via onMessageReceived in NCEngine.addMessageHandler.
  • After receiving a streaming message, call message.requestStreamMessage() to fetch the full content.

Receive streaming messages

Streaming messages are triggered by the server. Recipients receive them through the message handler.

kotlin
NCEngine.addMessageHandler("STREAM_HANDLER", object : MessageHandler {
override fun onMessageReceived(message: Message, left: Int, hasPackage: Boolean, offline: Boolean) {
if (message.content is StreamMessage) {
val streamMessage = message.content as StreamMessage
}
}
})

Historical streaming messages

Streaming messages are stored in the local database after receipt. Use the standard Get message history APIs to retrieve them.

Fetch streaming content

The streaming message content initially contains only the first chunk. After receiving a streaming message, call message.requestStreamMessage() to fetch incremental chunks. Use MessageHandler to receive streaming events.

Listen for streaming events

Register a MessageHandler to receive streaming fetch events. The fetch process has three event callbacks:

kotlin
NCEngine.addMessageHandler("STREAM_HANDLER", object : MessageHandler {
override fun onStreamMessageRequestInit(event: StreamMessageRequestInitEvent) {
// Request is ready; stale data from interrupted sessions is cleaned up
Log.d("Stream", "Stream init: ${event.messageId}")
}

override fun onStreamMessageRequestDelta(event: StreamMessageDeltaEvent) {
// Incremental chunk received; event.message contains the aggregated content so far
val delta = event.chunkInfo.content
val aggregated = (event.message.content as? StreamMessage)?.content
Log.d("Stream", "Delta: $delta")
}

override fun onStreamMessageRequestComplete(event: StreamMessageRequestCompleteEvent) {
if (event.error == null) {
Log.d("Stream", "Stream complete: ${event.messageId}")
} else {
Log.e("Stream", "Stream error: ${event.error?.message}")
}
}
})

Start fetching

After receiving a streaming message, check the sync property. If false, initiate the fetch:

kotlin
NCEngine.addMessageHandler("STREAM_RECEIVER", object : MessageHandler {
override fun onMessageReceived(message: Message, left: Int, hasPackage: Boolean, offline: Boolean) {
val content = message.content
if (content is StreamMessage && !content.sync) {
message.requestStreamMessage { error ->
if (error != null) {
Log.e("Stream", "Request failed: ${error.message}")
}
}
}
}
})

Streaming message summary

When the server finishes receiving the stream content, it sends a summary via message metadata. Listen for updates using MessageHandler.onMessageMetadataUpdated:

kotlin
NCEngine.addMessageHandler("STREAM_META_HANDLER", object : MessageHandler {
override fun onMessageMetadataUpdated(metadata: Map<String, String>, message: Message) {
if (message.content is StreamMessage) {
val summary = metadata["RC_Ext_StreamMsgSummary"]
Log.d("Stream", "Summary: $summary")
}
}
})

Get the summary from history

After the metadata update, the summary is stored in the database. Use the streaming message's isComplete property together with message.metadata to retrieve it:

kotlin
if (message.content is StreamMessage) {
val streamMessage = message.content as StreamMessage
if (streamMessage.isComplete) {
val summary = message.metadata?.get("RC_Ext_StreamMsgSummary")
}
}