## Overview DAP Net Stream provides high-level streaming communication infrastructure for the DAP ecosystem, implementing reliable data streams, message channels, and protocol management for distributed applications. This module enables robust, session-based communication with automatic connection management, multiplexing, and event-driven data processing. **Based on:** `dap-sdk/net/stream/include/dap_stream.h`, `dap-sdk/net/stream/src/dap_stream.c` ## Document Structure - [[#Overview|Overview]] - [[#Module Structures|Module Structures]] - [[#dap_stream_t|dap_stream_t - Stream Instance]] - [[#dap_stream_session_t|dap_stream_session_t - Session Management]] - [[#dap_stream_ch_t|dap_stream_ch_t - Stream Channel]] - [[#dap_stream_packet_t|dap_stream_packet_t - Data Packet]] - [[#dap_stream_worker_t|dap_stream_worker_t - Stream Worker]] - [[#Module Functions|Module Functions]] - [[#Stream Management|Stream Creation and Lifecycle]] - [[#Session Operations|Session Management]] - [[#Channel Operations|Channel Management]] - [[#Data Transfer|Packet Transmission]] - [[#Error Codes|Error Codes]] - [[#Typical Examples|Typical Examples]] ## Module Structures ### dap_stream_t Core stream structure representing a communication stream with session and channel management. ```c typedef struct dap_stream { struct { dap_events_socket_t *esocket; // Underlying event socket dap_stream_session_t *session; // Associated session dap_time_t ts_created; // Stream creation timestamp dap_time_t ts_last_activity; // Last activity timestamp size_t bytes_sent; // Total bytes sent size_t bytes_received; // Total bytes received bool is_active; // Stream active status uint32_t channel_count; // Number of active channels dap_stream_state_t state; // Current stream state } pub; struct { dap_htable_t *channel_table; // Channel hash table dap_list_t *channel_list; // Active channels list dap_buffer_t *send_buffer; // Send buffer queue dap_buffer_t *recv_buffer; // Receive buffer dap_stream_worker_t *worker; // Associated worker thread pthread_mutex_t stream_mutex; // Stream synchronization dap_stream_callbacks_t *callbacks; // Event callbacks void *callbacks_arg; // Callback arguments bool is_encrypted; // Encryption status dap_enc_t *encryption_context; // Encryption context } priv; } dap_stream_t; ``` **Public Fields:** - `esocket` - Underlying event socket for network communication - `session` - Associated session context for authentication and state - `ts_created` - Timestamp when stream was created - `ts_last_activity` - Last time data was sent or received - `bytes_sent` - Total bytes transmitted through this stream - `bytes_received` - Total bytes received through this stream - `is_active` - Whether stream is currently active and processing - `channel_count` - Number of logical channels multiplexed on this stream - `state` - Current operational state of the stream ### dap_stream_session_t Session management structure handling authentication and persistent state. ```c typedef struct dap_stream_session { struct { char session_id[DAP_STREAM_SESSION_ID_SIZE]; // Unique session identifier dap_time_t ts_created; // Session creation time dap_time_t ts_last_access; // Last access time dap_time_t ts_expires; // Session expiration time bool is_authenticated; // Authentication status char *user_name; // Authenticated username uint32_t permissions; // User permissions mask dap_session_state_t state; // Session state } pub; struct { dap_htable_t *data_table; // Session data storage dap_list_t *stream_list; // Associated streams dap_auth_context_t *auth_context; // Authentication context pthread_mutex_t session_mutex; // Session synchronization dap_session_stats_t *statistics; // Session statistics bool is_persistent; // Persistent session flag uint32_t reference_count; // Reference counter dap_timer_t *timeout_timer; // Session timeout timer } priv; } dap_stream_session_t; ``` ### dap_stream_ch_t Stream channel structure for multiplexed communication over a single stream. ```c typedef struct dap_stream_ch { struct { dap_stream_t *stream; // Parent stream uint8_t id; // Channel identifier dap_stream_ch_type_t type; // Channel type dap_stream_ch_state_t state; // Channel state size_t bytes_sent; // Bytes sent on channel size_t bytes_received; // Bytes received on channel dap_time_t ts_created; // Channel creation time dap_time_t ts_last_activity; // Last activity time bool is_ready; // Channel ready status } pub; struct { dap_stream_ch_proc_t *proc; // Channel processor void *internal; // Internal channel data dap_buffer_t *send_buffer; // Channel send buffer dap_buffer_t *recv_buffer; // Channel receive buffer dap_stream_ch_callbacks_t *callbacks; // Channel callbacks void *callback_arg; // Callback argument pthread_mutex_t ch_mutex; // Channel synchronization dap_stream_ch_stats_t *statistics; // Channel statistics bool is_encrypted; // Channel encryption } priv; } dap_stream_ch_t; ``` ### dap_stream_packet_t Data packet structure for stream communication with headers and payload. ```c typedef struct dap_stream_packet { struct { dap_stream_packet_hdr_t hdr; // Packet header void *data; // Packet payload data size_t data_size; // Size of payload dap_time_t timestamp; // Packet timestamp uint32_t sequence_id; // Sequence identifier uint8_t channel_id; // Target channel dap_packet_type_t type; // Packet type uint16_t flags; // Packet flags } pub; struct { dap_hash_fast_t checksum; // Packet checksum bool is_encrypted; // Encryption status uint32_t retry_count; // Retry attempts dap_time_t expires; // Packet expiration dap_stream_packet_t *next; // Next packet in queue bool requires_ack; // Acknowledgment required } priv; } dap_stream_packet_t; ``` ### dap_stream_worker_t Stream worker structure for processing streams in dedicated threads. ```c typedef struct dap_stream_worker { struct { uint32_t id; // Worker identifier pthread_t thread; // Worker thread bool is_running; // Worker running status uint32_t streams_count; // Number of assigned streams uint64_t packets_processed; // Total packets processed dap_time_t last_activity; // Last processing activity dap_worker_state_t state; // Worker state } pub; struct { dap_list_t *stream_list; // Assigned streams dap_queue_t *packet_queue; // Packet processing queue dap_events_t *events; // Event handling context pthread_mutex_t worker_mutex; // Worker synchronization pthread_cond_t worker_cond; // Worker condition variable dap_stream_worker_callbacks_t *callbacks; // Worker callbacks bool shutdown_requested; // Shutdown flag dap_stream_worker_stats_t *stats; // Worker statistics } priv; } dap_stream_worker_t; ``` ## Module Functions ### Stream Management #### `dap_stream_new()` Creates new stream instance with socket and session. ```c dap_stream_t* dap_stream_new(dap_events_socket_t *a_esocket, dap_stream_session_t *a_session); ``` **Parameters:** - `a_esocket` (dap_events_socket_t*) - Event socket for communication - `a_session` (dap_stream_session_t*) - Session context **Returns:** - `dap_stream_t*` - Created stream instance - `NULL` - Stream creation failed **Error Conditions:** - Returns NULL if a_esocket is NULL or invalid - Returns NULL if memory allocation fails - Returns NULL if stream initialization fails **Description:** Creates a new stream instance associated with an event socket and session. The stream is ready for channel creation and data transmission. #### `dap_stream_delete()` Destroys stream and frees all associated resources. ```c void dap_stream_delete(dap_stream_t *a_stream); ``` **Parameters:** - `a_stream` (dap_stream_t*) - Stream to destroy **Description:** Safely destroys the stream by closing all channels, flushing buffers, and freeing allocated memory. #### `dap_stream_set_ready_to_write()` Marks stream as ready for writing operations. ```c void dap_stream_set_ready_to_write(dap_stream_t *a_stream, bool a_is_ready); ``` **Parameters:** - `a_stream` (dap_stream_t*) - Stream instance - `a_is_ready` (bool) - Ready status #### `dap_stream_get_stats()` Retrieves stream statistics and performance metrics. ```c dap_stream_stats_t* dap_stream_get_stats(dap_stream_t *a_stream); ``` ### Session Operations #### `dap_stream_session_new()` Creates new session for stream authentication and state management. ```c dap_stream_session_t* dap_stream_session_new(void); ``` **Returns:** - `dap_stream_session_t*` - Created session instance - `NULL` - Session creation failed **Description:** Creates a new session with unique identifier and default configuration. Session provides authentication context and persistent state storage. #### `dap_stream_session_delete()` Destroys session and cleans up resources. ```c void dap_stream_session_delete(dap_stream_session_t *a_session); ``` #### `dap_stream_session_authenticate()` Performs authentication for the session. ```c int dap_stream_session_authenticate(dap_stream_session_t *a_session, const char *a_username, const char *a_password); ``` **Parameters:** - `a_session` (dap_stream_session_t*) - Session to authenticate - `a_username` (const char*) - Username for authentication - `a_password` (const char*) - Password for authentication **Returns:** - `0` - Authentication successful - `-1` - Invalid parameters - `-2` - Authentication failed - `-3` - Session expired #### `dap_stream_session_set_data()` Stores data in session context. ```c int dap_stream_session_set_data(dap_stream_session_t *a_session, const char *a_key, const void *a_data, size_t a_data_size); ``` ### Channel Operations #### `dap_stream_ch_new()` Creates new channel on existing stream. ```c dap_stream_ch_t* dap_stream_ch_new(dap_stream_t *a_stream, uint8_t a_ch_id); ``` **Parameters:** - `a_stream` (dap_stream_t*) - Parent stream - `a_ch_id` (uint8_t) - Channel identifier **Returns:** - `dap_stream_ch_t*` - Created channel instance - `NULL` - Channel creation failed **Error Conditions:** - Returns NULL if a_stream is NULL or invalid - Returns NULL if a_ch_id is already in use - Returns NULL if maximum channels exceeded #### `dap_stream_ch_delete()` Destroys channel and removes from stream. ```c void dap_stream_ch_delete(dap_stream_ch_t *a_ch); ``` #### `dap_stream_ch_write()` Writes data to channel. ```c ssize_t dap_stream_ch_write(dap_stream_ch_t *a_ch, const void *a_data, size_t a_data_size); ``` **Parameters:** - `a_ch` (dap_stream_ch_t*) - Channel to write to - `a_data` (const void*) - Data to write - `a_data_size` (size_t) - Size of data **Returns:** - `>= 0` - Number of bytes written - `-1` - Write operation failed #### `dap_stream_ch_read()` Reads data from channel. ```c ssize_t dap_stream_ch_read(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size); ``` ### Data Transfer #### `dap_stream_packet_new()` Creates new packet for stream transmission. ```c dap_stream_packet_t* dap_stream_packet_new(uint8_t a_ch_id, const void *a_data, size_t a_data_size); ``` **Parameters:** - `a_ch_id` (uint8_t) - Target channel identifier - `a_data` (const void*) - Packet payload data - `a_data_size` (size_t) - Size of payload **Returns:** - `dap_stream_packet_t*` - Created packet - `NULL` - Packet creation failed #### `dap_stream_packet_send()` Sends packet through stream. ```c int dap_stream_packet_send(dap_stream_t *a_stream, dap_stream_packet_t *a_packet); ``` **Parameters:** - `a_stream` (dap_stream_t*) - Stream to send through - `a_packet` (dap_stream_packet_t*) - Packet to send **Returns:** - `0` - Packet sent successfully - `-1` - Send operation failed ## Error Codes ### Stream Error Codes ```c typedef enum dap_stream_error { DAP_STREAM_ERROR_SUCCESS = 0, // Operation successful DAP_STREAM_ERROR_INVALID_PARAM = -1, // Invalid parameter provided DAP_STREAM_ERROR_NO_MEMORY = -2, // Memory allocation failed DAP_STREAM_ERROR_SOCKET_FAILED = -3, // Socket operation failed DAP_STREAM_ERROR_CHANNEL_NOT_FOUND = -4, // Channel not found DAP_STREAM_ERROR_CHANNEL_EXISTS = -5, // Channel already exists DAP_STREAM_ERROR_SESSION_INVALID = -6, // Invalid session DAP_STREAM_ERROR_AUTH_FAILED = -7, // Authentication failed DAP_STREAM_ERROR_PACKET_INVALID = -8, // Invalid packet format DAP_STREAM_ERROR_BUFFER_FULL = -9, // Buffer is full DAP_STREAM_ERROR_TIMEOUT = -10, // Operation timeout DAP_STREAM_ERROR_ENCRYPTION_FAILED = -11, // Encryption failed DAP_STREAM_ERROR_CHANNEL_LIMIT = -12, // Channel limit reached DAP_STREAM_ERROR_STREAM_CLOSED = -13 // Stream is closed } dap_stream_error_t; ``` ## Typical Examples ### Basic Stream Communication Example ```c #include "dap_common.h" #include "dap_stream.h" // Channel data callback void channel_data_callback(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size, void *a_arg) { log_it(L_INFO, "Received %zu bytes on channel %u", a_data_size, a_ch->pub.id); // Echo data back dap_stream_ch_write(a_ch, a_data, a_data_size); } int example_stream_communication() { log_it(L_INFO, "=== Stream Communication Example ==="); // Step 1: Create session dap_stream_session_t *session = dap_stream_session_new(); if (!session) { log_it(L_ERROR, "✗ Failed to create session"); return -1; } log_it(L_INFO, "✓ Session created with ID: %s", session->pub.session_id); // Step 2: Authenticate session int result = dap_stream_session_authenticate(session, "testuser", "password123"); if (result != 0) { log_it(L_ERROR, "✗ Session authentication failed: %d", result); dap_stream_session_delete(session); return -1; } log_it(L_INFO, "✓ Session authenticated for user: %s", session->pub.user_name); // Step 3: Create event socket (simplified - normally from connection) dap_events_socket_t *socket = dap_events_socket_create_tcp(NULL); if (!socket) { log_it(L_ERROR, "✗ Failed to create socket"); dap_stream_session_delete(session); return -1; } // Step 4: Create stream dap_stream_t *stream = dap_stream_new(socket, session); if (!stream) { log_it(L_ERROR, "✗ Failed to create stream"); dap_events_socket_delete(socket); dap_stream_session_delete(session); return -1; } log_it(L_INFO, "✓ Stream created successfully"); // Step 5: Create channels dap_stream_ch_t *data_channel = dap_stream_ch_new(stream, 1); if (!data_channel) { log_it(L_ERROR, "✗ Failed to create data channel"); goto cleanup; } log_it(L_INFO, "✓ Data channel created (ID: %u)", data_channel->pub.id); dap_stream_ch_t *control_channel = dap_stream_ch_new(stream, 2); if (!control_channel) { log_it(L_ERROR, "✗ Failed to create control channel"); goto cleanup; } log_it(L_INFO, "✓ Control channel created (ID: %u)", control_channel->pub.id); // Step 6: Set channel callbacks dap_stream_ch_callbacks_t callbacks = { .data_callback = channel_data_callback, .ready_callback = NULL, .error_callback = NULL }; data_channel->priv.callbacks = &callbacks; control_channel->priv.callbacks = &callbacks; // Step 7: Send test data const char *test_message = "Hello from DAP Stream!"; ssize_t sent = dap_stream_ch_write(data_channel, test_message, strlen(test_message)); if (sent > 0) { log_it(L_INFO, "✓ Sent %zd bytes to data channel", sent); } else { log_it(L_ERROR, "✗ Failed to send data to channel"); } // Step 8: Send control message const char *control_msg = "{\"command\":\"status\",\"id\":1}"; sent = dap_stream_ch_write(control_channel, control_msg, strlen(control_msg)); if (sent > 0) { log_it(L_INFO, "✓ Sent %zd bytes to control channel", sent); } // Step 9: Display stream statistics dap_stream_stats_t *stats = dap_stream_get_stats(stream); if (stats) { log_it(L_INFO, "Stream Statistics:"); log_it(L_INFO, " Active channels: %u", stream->pub.channel_count); log_it(L_INFO, " Bytes sent: %zu", stream->pub.bytes_sent); log_it(L_INFO, " Bytes received: %zu", stream->pub.bytes_received); log_it(L_INFO, " Last activity: %lu", stream->pub.ts_last_activity); } cleanup: // Step 10: Cleanup if (data_channel) dap_stream_ch_delete(data_channel); if (control_channel) dap_stream_ch_delete(control_channel); dap_stream_delete(stream); dap_stream_session_delete(session); log_it(L_INFO, "✓ Stream Communication example completed"); return 0; } ``` ### Multi-Channel Stream Example ```c #include "dap_common.h" #include "dap_stream.h" typedef struct channel_context { const char *channel_name; uint32_t message_count; uint64_t total_bytes; } channel_context_t; void multi_channel_callback(dap_stream_ch_t *a_ch, void *a_data, size_t a_data_size, void *a_arg) { channel_context_t *ctx = (channel_context_t*)a_arg; ctx->message_count++; ctx->total_bytes += a_data_size; log_it(L_INFO, "Channel %s: Message %u (%zu bytes)", ctx->channel_name, ctx->message_count, a_data_size); // Process data based on channel type if (strcmp(ctx->channel_name, "file_transfer") == 0) { // Handle file transfer data log_it(L_DEBUG, "Processing file chunk: %zu bytes", a_data_size); } else if (strcmp(ctx->channel_name, "messaging") == 0) { // Handle messaging data char *message = (char*)a_data; log_it(L_INFO, "Message received: %.*s", (int)a_data_size, message); } } int example_multi_channel_stream() { log_it(L_INFO, "=== Multi-Channel Stream Example ==="); // Step 1: Setup session and stream dap_stream_session_t *session = dap_stream_session_new(); if (!session) { log_it(L_ERROR, "✗ Failed to create session"); return -1; } dap_stream_session_authenticate(session, "multiuser", "secret"); dap_events_socket_t *socket = dap_events_socket_create_tcp(NULL); dap_stream_t *stream = dap_stream_new(socket, session); if (!stream) { log_it(L_ERROR, "✗ Failed to create stream"); goto cleanup_session; } // Step 2: Create multiple channels with contexts channel_context_t file_ctx = {"file_transfer", 0, 0}; channel_context_t msg_ctx = {"messaging", 0, 0}; channel_context_t status_ctx = {"status", 0, 0}; dap_stream_ch_t *file_channel = dap_stream_ch_new(stream, 10); dap_stream_ch_t *msg_channel = dap_stream_ch_new(stream, 20); dap_stream_ch_t *status_channel = dap_stream_ch_new(stream, 30); if (!file_channel || !msg_channel || !status_channel) { log_it(L_ERROR, "✗ Failed to create channels"); goto cleanup_stream; } // Step 3: Set callbacks with contexts dap_stream_ch_callbacks_t callbacks = { .data_callback = multi_channel_callback }; file_channel->priv.callbacks = &callbacks; file_channel->priv.callback_arg = &file_ctx; msg_channel->priv.callbacks = &callbacks; msg_channel->priv.callback_arg = &msg_ctx; status_channel->priv.callbacks = &callbacks; status_channel->priv.callback_arg = &status_ctx; log_it(L_INFO, "✓ Created 3 channels: file_transfer(10), messaging(20), status(30)"); // Step 4: Simulate different types of data transmission // File transfer simulation log_it(L_INFO, "Simulating file transfer..."); for (int i = 0; i < 5; i++) { char file_chunk[256]; snprintf(file_chunk, sizeof(file_chunk), "FILE_CHUNK_%03d: %s", i, "Binary data would be here in real implementation"); dap_stream_ch_write(file_channel, file_chunk, strlen(file_chunk)); usleep(100000); // 100ms delay } // Messaging simulation log_it(L_INFO, "Simulating messaging..."); const char *messages[] = { "Hello from multi-channel stream!", "This is message number 2", "Channels work independently", "Each channel has its own context", "Final message in this demo" }; for (int i = 0; i < 5; i++) { dap_stream_ch_write(msg_channel, messages[i], strlen(messages[i])); usleep(200000); // 200ms delay } // Status updates simulation log_it(L_INFO, "Simulating status updates..."); for (int i = 0; i < 3; i++) { char status_msg[128]; snprintf(status_msg, sizeof(status_msg), "{\"status\":\"active\",\"timestamp\":%lu,\"sequence\":%d}", dap_time_now(), i + 1); dap_stream_ch_write(status_channel, status_msg, strlen(status_msg)); sleep(1); } // Step 5: Display channel statistics log_it(L_INFO, "Channel Statistics:"); log_it(L_INFO, " File Transfer Channel:"); log_it(L_INFO, " Messages: %u", file_ctx.message_count); log_it(L_INFO, " Total bytes: %lu", file_ctx.total_bytes); log_it(L_INFO, " Bytes sent: %zu", file_channel->pub.bytes_sent); log_it(L_INFO, " Messaging Channel:"); log_it(L_INFO, " Messages: %u", msg_ctx.message_count); log_it(L_INFO, " Total bytes: %lu", msg_ctx.total_bytes); log_it(L_INFO, " Bytes sent: %zu", msg_channel->pub.bytes_sent); log_it(L_INFO, " Status Channel:"); log_it(L_INFO, " Messages: %u", status_ctx.message_count); log_it(L_INFO, " Total bytes: %lu", status_ctx.total_bytes); log_it(L_INFO, " Bytes sent: %zu", status_channel->pub.bytes_sent); // Step 6: Overall stream statistics log_it(L_INFO, "Overall Stream Statistics:"); log_it(L_INFO, " Total channels: %u", stream->pub.channel_count); log_it(L_INFO, " Stream bytes sent: %zu", stream->pub.bytes_sent); log_it(L_INFO, " Stream bytes received: %zu", stream->pub.bytes_received); log_it(L_INFO, " Stream active: %s", stream->pub.is_active ? "yes" : "no"); cleanup_stream: if (file_channel) dap_stream_ch_delete(file_channel); if (msg_channel) dap_stream_ch_delete(msg_channel); if (status_channel) dap_stream_ch_delete(status_channel); dap_stream_delete(stream); cleanup_session: dap_stream_session_delete(session); log_it(L_INFO, "✓ Multi-Channel Stream example completed"); return 0; } ``` --- **See also:** [[Module DAP Net - Client|Module DAP Net - Client]], [[Module DAP Net - Server|Module DAP Net - Server]], [[Module DAP IO|Module DAP IO]] --- *Based on: `dap-sdk/net/stream/include/dap_stream.h`, `dap-sdk/net/stream/src/dap_stream.c`*