## Overview
DAP Net Stream provides high-level streaming communication infrastructure for the DAP ecosystem, implementing reliable data streams, message channels, and protocol management for distributed applications. This module enables robust, session-based communication with automatic connection management, multiplexing, and event-driven data processing.
**Based on:** `dap-sdk/net/stream/include/dap_stream.h`, `dap-sdk/net/stream/src/dap_stream.c`
## Document Structure
- [[#Overview|Overview]]
- [[#Module Structures|Module Structures]]
- [[#dap_stream_t|dap_stream_t - Stream Instance]]
- [[#dap_stream_session_t|dap_stream_session_t - Session Management]]
- [[#dap_stream_ch_t|dap_stream_ch_t - Stream Channel]]
- [[#dap_stream_packet_t|dap_stream_packet_t - Data Packet]]
- [[#dap_stream_worker_t|dap_stream_worker_t - Stream Worker]]
- [[#Module Functions|Module Functions]]
- [[#Stream Management|Stream Creation and Lifecycle]]
- [[#Session Operations|Session Management]]
- [[#Channel Operations|Channel Management]]
- [[#Data Transfer|Packet Transmission]]
- [[#Error Codes|Error Codes]]
- [[#Typical Examples|Typical Examples]]
## Module Structures
### dap_stream_t
Core stream structure representing a communication stream with session and channel management.
```c
typedef struct dap_stream {
struct {
dap_events_socket_t *esocket; // Underlying event socket
dap_stream_session_t *session; // Associated session
dap_time_t ts_created; // Stream creation timestamp
dap_time_t ts_last_activity; // Last activity timestamp
size_t bytes_sent; // Total bytes sent
size_t bytes_received; // Total bytes received
bool is_active; // Stream active status
uint32_t channel_count; // Number of active channels
dap_stream_state_t state; // Current stream state
} pub;
struct {
dap_htable_t *channel_table; // Channel hash table
dap_list_t *channel_list; // Active channels list
dap_buffer_t *send_buffer; // Send buffer queue
dap_buffer_t *recv_buffer; // Receive buffer
dap_stream_worker_t *worker; // Associated worker thread
pthread_mutex_t stream_mutex; // Stream synchronization
dap_stream_callbacks_t *callbacks; // Event callbacks
void *callbacks_arg; // Callback arguments
bool is_encrypted; // Encryption status
dap_enc_t *encryption_context; // Encryption context
} priv;
} dap_stream_t;
```
**Public Fields:**
- `esocket` - Underlying event socket for network communication
- `session` - Associated session context for authentication and state
- `ts_created` - Timestamp when stream was created
- `ts_last_activity` - Last time data was sent or received
- `bytes_sent` - Total bytes transmitted through this stream
- `bytes_received` - Total bytes received through this stream
- `is_active` - Whether stream is currently active and processing
- `channel_count` - Number of logical channels multiplexed on this stream
- `state` - Current operational state of the stream
### dap_stream_session_t
Session management structure handling authentication and persistent state.
```c
typedef struct dap_stream_session {
struct {
char session_id[DAP_STREAM_SESSION_ID_SIZE]; // Unique session identifier
dap_time_t ts_created; // Session creation time
dap_time_t ts_last_access; // Last access time
dap_time_t ts_expires; // Session expiration time
bool is_authenticated; // Authentication status
char *user_name; // Authenticated username
uint32_t permissions; // User permissions mask
dap_session_state_t state; // Session state
} pub;
struct {
dap_htable_t *data_table; // Session data storage
dap_list_t *stream_list; // Associated streams
dap_auth_context_t *auth_context; // Authentication context
pthread_mutex_t session_mutex; // Session synchronization
dap_session_stats_t *statistics; // Session statistics
bool is_persistent; // Persistent session flag
uint32_t reference_count; // Reference counter
dap_timer_t *timeout_timer; // Session timeout timer
} priv;
} dap_stream_session_t;
```
### dap_stream_ch_t
Stream channel structure for multiplexed communication over a single stream.
```c
typedef struct dap_stream_ch {
struct {
dap_stream_t *stream; // Parent stream
uint8_t id; // Channel identifier
dap_stream_ch_type_t type; // Channel type
dap_stream_ch_state_t state; // Channel state
size_t bytes_sent; // Bytes sent on channel
size_t bytes_received; // Bytes received on channel
dap_time_t ts_created; // Channel creation time
dap_time_t ts_last_activity; // Last activity time
bool is_ready; // Channel ready status
} pub;
struct {
dap_stream_ch_proc_t *proc; // Channel processor
void *internal; // Internal channel data
dap_buffer_t *send_buffer; // Channel send buffer
dap_buffer_t *recv_buffer; // Channel receive buffer
dap_stream_ch_callbacks_t *callbacks; // Channel callbacks
void *callback_arg; // Callback argument
pthread_mutex_t ch_mutex; // Channel synchronization
dap_stream_ch_stats_t *statistics; // Channel statistics
bool is_encrypted; // Channel encryption
} priv;
} dap_stream_ch_t;
```
### dap_stream_packet_t
Data packet structure for stream communication with headers and payload.
```c
typedef struct dap_stream_packet {
struct {
dap_stream_packet_hdr_t hdr; // Packet header
void *data; // Packet payload data
size_t data_size; // Size of payload
dap_time_t timestamp; // Packet timestamp
uint32_t sequence_id; // Sequence identifier
uint8_t channel_id; // Target channel
dap_packet_type_t type; // Packet type
uint16_t flags; // Packet flags
} pub;
struct {
dap_hash_fast_t checksum; // Packet checksum
bool is_encrypted; // Encryption status
uint32_t retry_count; // Retry attempts
dap_time_t expires; // Packet expiration
dap_stream_packet_t *next; // Next packet in queue
bool requires_ack; // Acknowledgment required
} priv;
} dap_stream_packet_t;
```
### dap_stream_worker_t
Stream worker structure for processing streams in dedicated threads.
```c
typedef struct dap_stream_worker {
struct {
uint32_t id; // Worker identifier
pthread_t thread; // Worker thread
bool is_running; // Worker running status
uint32_t streams_count; // Number of assigned streams
uint64_t packets_processed; // Total packets processed
dap_time_t last_activity; // Last processing activity
dap_worker_state_t state; // Worker state
} pub;
struct {
dap_list_t *stream_list; // Assigned streams
dap_queue_t *packet_queue; // Packet processing queue
dap_events_t *events; // Event handling context
pthread_mutex_t worker_mutex; // Worker synchronization
pthread_cond_t worker_cond; // Worker condition variable
dap_stream_worker_callbacks_t *callbacks; // Worker callbacks
bool shutdown_requested; // Shutdown flag
dap_stream_worker_stats_t *stats; // Worker statistics
} priv;
} dap_stream_worker_t;
```
## Module Functions
### Stream Management
#### `dap_stream_new()`
Creates new stream instance with socket and session.
```c
dap_stream_t* dap_stream_new(dap_events_socket_t *a_esocket, dap_stream_session_t *a_session);
```
**Parameters:**
- `a_esocket` (dap_events_socket_t*) - Event socket for communication
- `a_session` (dap_stream_session_t*) - Session context
**Returns:**
- `dap_stream_t*` - Created stream instance
- `NULL` - Stream creation failed
**Error Conditions:**
- Returns NULL if a_esocket is NULL or invalid
- Returns NULL if memory allocation fails
- Returns NULL if stream initialization fails
**Description:** Creates a new stream instance associated with an event socket and session. The stream is ready for channel creation and data transmission.
#### `dap_stream_delete()`
Destroys stream and frees all associated resources.
```c
void dap_stream_delete(dap_stream_t *a_stream);
```
**Parameters:**
- `a_stream` (dap_stream_t*) - Stream to destroy
**Description:** Safely destroys the stream by closing all channels, flushing buffers, and freeing allocated memory.
#### `dap_stream_set_ready_to_write()`
Marks stream as ready for writing operations.
```c
void dap_stream_set_ready_to_write(dap_stream_t *a_stream, bool a_is_ready);
```
**Parameters:**
- `a_stream` (dap_stream_t*) - Stream instance
- `a_is_ready` (bool) - Ready status
#### `dap_stream_get_stats()`
Retrieves stream statistics and performance metrics.
```c
dap_stream_stats_t* dap_stream_get_stats(dap_stream_t *a_stream);
```
### Session Operations
#### `dap_stream_session_new()`
Creates new session for stream authentication and state management.
```c
dap_stream_session_t* dap_stream_session_new(void);
```
**Returns:**
- `dap_stream_session_t*` - Created session instance
- `NULL` - Session creation failed
**Description:** Creates a new session with unique identifier and default configuration. Session provides authentication context and persistent state storage.
#### `dap_stream_session_delete()`
Destroys session and cleans up resources.
```c
void dap_stream_session_delete(dap_stream_session_t *a_session);
```
#### `dap_stream_session_authenticate()`
Performs authentication for the session.
```c
int dap_stream_session_authenticate(dap_stream_session_t *a_session, const char *a_username, const char *a_password);
```
**Parameters:**
- `a_session` (dap_stream_session_t*) - Session to authenticate
- `a_username` (const char*) - Username for authentication
- `a_password` (const char*) - Password for authentication
**Returns:**
- `0` - Authentication successful
- `-1` - Invalid parameters
- `-2` - Authentication failed
- `-3` - Session expired
#### `dap_stream_session_set_data()`
Stores data in session context.
```c
int dap_stream_session_set_data(dap_stream_session_t *a_session, const char *a_key, const void *a_data, size_t a_data_size);
```
### Channel Operations
#### `dap_stream_ch_new()`
Creates new channel on existing stream.
```c
dap_stream_ch_t* dap_stream_ch_new(dap_stream_t *a_stream, uint8_t a_ch_id);
```
**Parameters:**
- `a_stream` (dap_stream_t*) - Parent stream
- `a_ch_id` (uint8_t) - Channel identifier
**Returns:**
- `dap_stream_ch_t*` - Created channel instance
- `NULL` - Channel creation failed
**Error Conditions:**
- Returns NULL if a_stream is NULL or invalid
- Returns NULL if a_ch_id is already in use
- Returns NULL if maximum channels exceeded
#### `dap_stream_ch_delete()`
Destroys channel and removes from stream.
```c
void dap_stream_ch_delete(dap_stream_ch_t *a_ch);
```
#### `dap_stream_ch_write()`
Writes data to channel.
```c
ssize_t dap_stream_ch_write(dap_stream_ch_t *a_ch, const void *a_data, size_t a_data_size);
```
**Parameters:**
- `a_ch` (dap_stream_ch_t*) - Channel to write to
- `a_data` (const void*) - Data to write
- `a_data_size` (size_t) - Size of data
**Returns:**
- `>= 0` - Number of bytes written
- `-1` - Write operation failed
#### `dap_stream_ch_read()`
Reads data from channel.
```c
ssize_t dap_stream_ch_read(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size);
```
### Data Transfer
#### `dap_stream_packet_new()`
Creates new packet for stream transmission.
```c
dap_stream_packet_t* dap_stream_packet_new(uint8_t a_ch_id, const void *a_data, size_t a_data_size);
```
**Parameters:**
- `a_ch_id` (uint8_t) - Target channel identifier
- `a_data` (const void*) - Packet payload data
- `a_data_size` (size_t) - Size of payload
**Returns:**
- `dap_stream_packet_t*` - Created packet
- `NULL` - Packet creation failed
#### `dap_stream_packet_send()`
Sends packet through stream.
```c
int dap_stream_packet_send(dap_stream_t *a_stream, dap_stream_packet_t *a_packet);
```
**Parameters:**
- `a_stream` (dap_stream_t*) - Stream to send through
- `a_packet` (dap_stream_packet_t*) - Packet to send
**Returns:**
- `0` - Packet sent successfully
- `-1` - Send operation failed
## Error Codes
### Stream Error Codes
```c
typedef enum dap_stream_error {
DAP_STREAM_ERROR_SUCCESS = 0, // Operation successful
DAP_STREAM_ERROR_INVALID_PARAM = -1, // Invalid parameter provided
DAP_STREAM_ERROR_NO_MEMORY = -2, // Memory allocation failed
DAP_STREAM_ERROR_SOCKET_FAILED = -3, // Socket operation failed
DAP_STREAM_ERROR_CHANNEL_NOT_FOUND = -4, // Channel not found
DAP_STREAM_ERROR_CHANNEL_EXISTS = -5, // Channel already exists
DAP_STREAM_ERROR_SESSION_INVALID = -6, // Invalid session
DAP_STREAM_ERROR_AUTH_FAILED = -7, // Authentication failed
DAP_STREAM_ERROR_PACKET_INVALID = -8, // Invalid packet format
DAP_STREAM_ERROR_BUFFER_FULL = -9, // Buffer is full
DAP_STREAM_ERROR_TIMEOUT = -10, // Operation timeout
DAP_STREAM_ERROR_ENCRYPTION_FAILED = -11, // Encryption failed
DAP_STREAM_ERROR_CHANNEL_LIMIT = -12, // Channel limit reached
DAP_STREAM_ERROR_STREAM_CLOSED = -13 // Stream is closed
} dap_stream_error_t;
```
## Typical Examples
### Basic Stream Communication Example
```c
#include "dap_common.h"
#include "dap_stream.h"
// Channel data callback
void channel_data_callback(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size, void *a_arg) {
log_it(L_INFO, "Received %zu bytes on channel %u", a_data_size, a_ch->pub.id);
// Echo data back
dap_stream_ch_write(a_ch, a_data, a_data_size);
}
int example_stream_communication() {
log_it(L_INFO, "=== Stream Communication Example ===");
// Step 1: Create session
dap_stream_session_t *session = dap_stream_session_new();
if (!session) {
log_it(L_ERROR, "✗ Failed to create session");
return -1;
}
log_it(L_INFO, "✓ Session created with ID: %s", session->pub.session_id);
// Step 2: Authenticate session
int result = dap_stream_session_authenticate(session, "testuser", "password123");
if (result != 0) {
log_it(L_ERROR, "✗ Session authentication failed: %d", result);
dap_stream_session_delete(session);
return -1;
}
log_it(L_INFO, "✓ Session authenticated for user: %s", session->pub.user_name);
// Step 3: Create event socket (simplified - normally from connection)
dap_events_socket_t *socket = dap_events_socket_create_tcp(NULL);
if (!socket) {
log_it(L_ERROR, "✗ Failed to create socket");
dap_stream_session_delete(session);
return -1;
}
// Step 4: Create stream
dap_stream_t *stream = dap_stream_new(socket, session);
if (!stream) {
log_it(L_ERROR, "✗ Failed to create stream");
dap_events_socket_delete(socket);
dap_stream_session_delete(session);
return -1;
}
log_it(L_INFO, "✓ Stream created successfully");
// Step 5: Create channels
dap_stream_ch_t *data_channel = dap_stream_ch_new(stream, 1);
if (!data_channel) {
log_it(L_ERROR, "✗ Failed to create data channel");
goto cleanup;
}
log_it(L_INFO, "✓ Data channel created (ID: %u)", data_channel->pub.id);
dap_stream_ch_t *control_channel = dap_stream_ch_new(stream, 2);
if (!control_channel) {
log_it(L_ERROR, "✗ Failed to create control channel");
goto cleanup;
}
log_it(L_INFO, "✓ Control channel created (ID: %u)", control_channel->pub.id);
// Step 6: Set channel callbacks
dap_stream_ch_callbacks_t callbacks = {
.data_callback = channel_data_callback,
.ready_callback = NULL,
.error_callback = NULL
};
data_channel->priv.callbacks = &callbacks;
control_channel->priv.callbacks = &callbacks;
// Step 7: Send test data
const char *test_message = "Hello from DAP Stream!";
ssize_t sent = dap_stream_ch_write(data_channel, test_message, strlen(test_message));
if (sent > 0) {
log_it(L_INFO, "✓ Sent %zd bytes to data channel", sent);
} else {
log_it(L_ERROR, "✗ Failed to send data to channel");
}
// Step 8: Send control message
const char *control_msg = "{\"command\":\"status\",\"id\":1}";
sent = dap_stream_ch_write(control_channel, control_msg, strlen(control_msg));
if (sent > 0) {
log_it(L_INFO, "✓ Sent %zd bytes to control channel", sent);
}
// Step 9: Display stream statistics
dap_stream_stats_t *stats = dap_stream_get_stats(stream);
if (stats) {
log_it(L_INFO, "Stream Statistics:");
log_it(L_INFO, " Active channels: %u", stream->pub.channel_count);
log_it(L_INFO, " Bytes sent: %zu", stream->pub.bytes_sent);
log_it(L_INFO, " Bytes received: %zu", stream->pub.bytes_received);
log_it(L_INFO, " Last activity: %lu", stream->pub.ts_last_activity);
}
cleanup:
// Step 10: Cleanup
if (data_channel) dap_stream_ch_delete(data_channel);
if (control_channel) dap_stream_ch_delete(control_channel);
dap_stream_delete(stream);
dap_stream_session_delete(session);
log_it(L_INFO, "✓ Stream Communication example completed");
return 0;
}
```
### Multi-Channel Stream Example
```c
#include "dap_common.h"
#include "dap_stream.h"
typedef struct channel_context {
const char *channel_name;
uint32_t message_count;
uint64_t total_bytes;
} channel_context_t;
void multi_channel_callback(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size, void *a_arg) {
channel_context_t *ctx = (channel_context_t*)a_arg;
ctx->message_count++;
ctx->total_bytes += a_data_size;
log_it(L_INFO, "Channel %s: Message %u (%zu bytes)",
ctx->channel_name, ctx->message_count, a_data_size);
// Process data based on channel type
if (strcmp(ctx->channel_name, "file_transfer") == 0) {
// Handle file transfer data
log_it(L_DEBUG, "Processing file chunk: %zu bytes", a_data_size);
} else if (strcmp(ctx->channel_name, "messaging") == 0) {
// Handle messaging data
char *message = (char*)a_data;
log_it(L_INFO, "Message received: %.*s", (int)a_data_size, message);
}
}
int example_multi_channel_stream() {
log_it(L_INFO, "=== Multi-Channel Stream Example ===");
// Step 1: Setup session and stream
dap_stream_session_t *session = dap_stream_session_new();
if (!session) {
log_it(L_ERROR, "✗ Failed to create session");
return -1;
}
dap_stream_session_authenticate(session, "multiuser", "secret");
dap_events_socket_t *socket = dap_events_socket_create_tcp(NULL);
dap_stream_t *stream = dap_stream_new(socket, session);
if (!stream) {
log_it(L_ERROR, "✗ Failed to create stream");
goto cleanup_session;
}
// Step 2: Create multiple channels with contexts
channel_context_t file_ctx = {"file_transfer", 0, 0};
channel_context_t msg_ctx = {"messaging", 0, 0};
channel_context_t status_ctx = {"status", 0, 0};
dap_stream_ch_t *file_channel = dap_stream_ch_new(stream, 10);
dap_stream_ch_t *msg_channel = dap_stream_ch_new(stream, 20);
dap_stream_ch_t *status_channel = dap_stream_ch_new(stream, 30);
if (!file_channel || !msg_channel || !status_channel) {
log_it(L_ERROR, "✗ Failed to create channels");
goto cleanup_stream;
}
// Step 3: Set callbacks with contexts
dap_stream_ch_callbacks_t callbacks = {
.data_callback = multi_channel_callback
};
file_channel->priv.callbacks = &callbacks;
file_channel->priv.callback_arg = &file_ctx;
msg_channel->priv.callbacks = &callbacks;
msg_channel->priv.callback_arg = &msg_ctx;
status_channel->priv.callbacks = &callbacks;
status_channel->priv.callback_arg = &status_ctx;
log_it(L_INFO, "✓ Created 3 channels: file_transfer(10), messaging(20), status(30)");
// Step 4: Simulate different types of data transmission
// File transfer simulation
log_it(L_INFO, "Simulating file transfer...");
for (int i = 0; i < 5; i++) {
char file_chunk[256];
snprintf(file_chunk, sizeof(file_chunk), "FILE_CHUNK_%03d: %s",
i, "Binary data would be here in real implementation");
dap_stream_ch_write(file_channel, file_chunk, strlen(file_chunk));
usleep(100000); // 100ms delay
}
// Messaging simulation
log_it(L_INFO, "Simulating messaging...");
const char *messages[] = {
"Hello from multi-channel stream!",
"This is message number 2",
"Channels work independently",
"Each channel has its own context",
"Final message in this demo"
};
for (int i = 0; i < 5; i++) {
dap_stream_ch_write(msg_channel, messages[i], strlen(messages[i]));
usleep(200000); // 200ms delay
}
// Status updates simulation
log_it(L_INFO, "Simulating status updates...");
for (int i = 0; i < 3; i++) {
char status_msg[128];
snprintf(status_msg, sizeof(status_msg),
"{\"status\":\"active\",\"timestamp\":%lu,\"sequence\":%d}",
dap_time_now(), i + 1);
dap_stream_ch_write(status_channel, status_msg, strlen(status_msg));
sleep(1);
}
// Step 5: Display channel statistics
log_it(L_INFO, "Channel Statistics:");
log_it(L_INFO, " File Transfer Channel:");
log_it(L_INFO, " Messages: %u", file_ctx.message_count);
log_it(L_INFO, " Total bytes: %lu", file_ctx.total_bytes);
log_it(L_INFO, " Bytes sent: %zu", file_channel->pub.bytes_sent);
log_it(L_INFO, " Messaging Channel:");
log_it(L_INFO, " Messages: %u", msg_ctx.message_count);
log_it(L_INFO, " Total bytes: %lu", msg_ctx.total_bytes);
log_it(L_INFO, " Bytes sent: %zu", msg_channel->pub.bytes_sent);
log_it(L_INFO, " Status Channel:");
log_it(L_INFO, " Messages: %u", status_ctx.message_count);
log_it(L_INFO, " Total bytes: %lu", status_ctx.total_bytes);
log_it(L_INFO, " Bytes sent: %zu", status_channel->pub.bytes_sent);
// Step 6: Overall stream statistics
log_it(L_INFO, "Overall Stream Statistics:");
log_it(L_INFO, " Total channels: %u", stream->pub.channel_count);
log_it(L_INFO, " Stream bytes sent: %zu", stream->pub.bytes_sent);
log_it(L_INFO, " Stream bytes received: %zu", stream->pub.bytes_received);
log_it(L_INFO, " Stream active: %s", stream->pub.is_active ? "yes" : "no");
cleanup_stream:
if (file_channel) dap_stream_ch_delete(file_channel);
if (msg_channel) dap_stream_ch_delete(msg_channel);
if (status_channel) dap_stream_ch_delete(status_channel);
dap_stream_delete(stream);
cleanup_session:
dap_stream_session_delete(session);
log_it(L_INFO, "✓ Multi-Channel Stream example completed");
return 0;
}
```
---
**See also:** [[Module DAP Net - Client|Module DAP Net - Client]], [[Module DAP Net - Server|Module DAP Net - Server]], [[Module DAP IO|Module DAP IO]]
---
*Based on: `dap-sdk/net/stream/include/dap_stream.h`, `dap-sdk/net/stream/src/dap_stream.c`*