Working with I/O, multitasking and networking. In theory, these are all three different tasks - I/O, multitasking, and networking. But in fact, this is all very testy intertwined, so it will be explained together.
The I/O Reactor is based on functions such as
select() and others like them, which allow you to group I/O object (sockets, files, named pipes, etc.) handles together, allowing you to guarantee consistent processing of input and output, that eliminates the need to use semaphores, mutexes and other kernel objects to synchronize data between handle handlers, and also has a number of other advantages. In particular, with a large number of simultaneous connections, there is no frequent switching of processing thread contexts, eating up to 30% of the system time with a large number of connections. There are other benefits as well.
The reactor consists form worker threads, each of which is tied to its own processor in the system - switching contexts without changing the processor is much faster than with it, moreover, a dedicated handler processing thread signals to the drivers and the I/O subsystem that a number of others can be used here. Optimization techniques, such as Flow Control, in which balancing between hardware queues occurs directly by the processor of the network card, which are then translated directly into the memory areas allocated for individual processors and interrupts or messages about updating the indexes are sent directly to the same processors - reads of records in circular buffers, depending on hardware. Not all drivers, operating systems, network cards and motherboards support this, virtual machines also contribute to the diversity - all this should be taken into account when working with input/output: for example, a packet from a multiqueue tun device can come as an entire session to the same descriptor, and randomly spread over the rest of the descriptors on neighboring processors, and grouped into 2-3 (out of 8). Inside the worker threads, event handling on the descriptor (
epoll() and its colleagues) runs in a conditionally infinite loop. An event socket is bound to each descriptor
The socket is described by the dap_events_socket_t structure, with a pointer to an instance of which you will often encounter. This structure groups data associated with the system I/O handler, in particular, read/write callbacks, as well as a number of other events. In addition, she is also engaged in additional data buffering, in more detail in the description in the appendix.
In the mechanics of esockets, it is important to consider from which part of the code you refer to this pointer - the execution context. Depending on the context, you may or may not be able to access its fields (in future versions, they will apparently disappear completely, moving to a private substructure), and you will also have to call functions with the
_unsafe postfix, depending on whether you are working in your own context esocket or outside of it.
The event socket can be of different types for very different purposes. What they all have in common is that they work through file / socket descriptors and are associated with a specific type of these descriptors in the system. The types are described in the enumerated type
DESCRIPTOR_TYPE_SOCKET -socket for outgoing or incoming connection
DESCRIPTOR_TYPE_SOCKET_LISTENING - a socket listening on a port and accepting new connections
DESCRIPTOR_TYPE_QUEUE - a queue of data or pointers to them
DESCRIPTOR_TYPE_PIPE - binary data pipe
DESCRIPTOR_TYPE_TIMER - timer
DESCRIPTOR_TYPE_EVENT - event
DESCRIPTOR_TYPE_FILE - file descriptor
More about e-socket - here.
Reactor handler threads or otherwise workers (pointers to dap_worker_t ) are what processes I/O, being exclusively tied to a specific CPU in the system, so their number is usually the same or slightly less than the number of processors in the system - this number can be when initializing
dap_events_init in the
a_threads_count parameter, or give us
0 and it will be selected automatically according to the number of CPUs in the system. To start them, not only the initialization of the
dap_events_init module is required, but also the creation of a
dap_events_t object using the
dap_events_new function and further start by calling
dap_events_start, after which you can safely initialize everything that is required. If necessary, you can stand up while waiting for the completion of the reactor workers by calling
dap_events_wait - if the application is built mainly on DapSDK, then this usually needs to be done in
main.c so as not to terminate the main thread of execution ahead of time, or forcibly signal completion via
The threads themselves are described by
dap_worker_t objects, which in their own execution context can be obtained from the
.worker field of the event socket, as well as directly by the CPU number via
dap_events_worker_get) or by requesting automatic fetch from the reactor handler threads via
dap_events_worker_get_auto - in this case, the handler with the lowest load will be selected. As you can understand, the latter is used to evenly distribute the load between the reactor handlers and the CPU.
The callback processor, like the reactor, consists of handler threads, which are also exclusively bound to the CPU. Unlike the reactor, the callback processor is designed for longer-term operations, but at the same time still limited in execution time, since callbacks in each handler thread are executed sequentially in the context of the same thread - it is recommended to execute operations of unlimited duration in their own streams. A typical use of callbacks is to flip an event socket and related objects from the reactor to the callback processor for fairly lengthy operations, such as accessing the database and generating a response, and then, upon completion, throwing the event socket back into the reactor.
The callback queue is needed for thread-safe reception of new callbacks to the execution queue, for which there are functions
void dap_proc_queue_add_callback and
dap_proc_queue_add_callback with which you can place a callback on an automatically selected queue or on a specific specified queue. The
dap_worker_t object has a pointer to an instance of a queue associated with a specific CPU in the
.proc_queue field - context switching between threads on the same CPU is still less time-consuming than switching between threads on different CPUs, so it is recommended to use that thread queue execution, which corresponds to the execution context from which this use occurs - except for those cases when there is multiple addition of callbacks from the same context and the task of balancing the load between the CPU is more relevant.
They are similar to reactor workers, except that they only process read-ready events to receive new callbacks. Inside the loop, callbacks are executed sequentially; if they return true, they are removed from the list. Important: the callback for the processor should be executed for a limited time, not so short as in the case of the reactor, but it should not be too long either, ideally no more than a second or two. If you need to do something more, it is better to remember the state and exit with false, then on the next cycle the callback will be executed again. If this is impossible or it is not possible to predict the execution time in principle, it is better to create your own thread and work in it so as not to slow down the execution of other callbacks in the processor.
To work with the network, work with basic TCP/UDP protocols, HTTP over TCP, as well as DNS over UDP is implemented. Incoming connections are also received through the reactor, for which the
dap_server_type_t object creates a set of event socket of
DESCRIPTOR_TYPE_SOCKET_LISTENING type, one socket for each worker (on some platforms the situation may differ and only one thread will process incoming connections - depending on the capabilities of local POSIX analogs
To implement receiving and processing incoming connections, you need to create a
dap_server_t object using the
dap_server_new function and then either transfer its further initialization to other protocols (for example, HTTP), or by yourself filling in the callback table in the field.
The server is implemented as
dap_http_t inherited from
dap_server_t, created using the
dap_http_new function, then URL handlers are added to it via
It directly processes the read and write callbacks for the header and data sections, all of them are listed in the arguments of the
dap_http_add_proc function. This function also requires setting the URL at which the installed callbacks will be called.
Simplifies request processing by reducing the number of required callbacks to one, which is passed through
In this callback, it is enough to process the entire request, and the rest will be done for you. Very convenient indeed. A detailed description of the requirements for the function that processes can be read in
DAPs have three parts: initializing encryption, encrypted requests/responses, and streaming connection, also encrypted. Initialization generally follows the
/enc_init link, although in some cases it differs from security purposes (for example
Initializes connections, as well as remembers public and private keys under indexes. After a certain period of non-use, keys are automatically deleted from memory. Such a server is created using
enc_http_add_proc(), usually with URL
Through a special HTTP header, the server receives information about the identifier of the key with which the request is encrypted, decrypts it and passes it on to the callback, which processes it and forms a response. The response is then encrypted and sent to the requester. Encrypted URLs are created through the usual
dap_http_simple_proc_add, inside which requests are decrypted through
enc_http_request_decode, then a response is generated through
enc_http_reply_f and finally the response is encrypted using
The stream server multiplexes separate virtual two-way byte streams into one physical connection, as well as encrypts/decrypts, restores data order, and defragments received data. It can work both over the bare UDP protocol and over the HTTP protocol For initialization, you need to call
dap_stream_init for client mode. To work with the streaming client and server, you also need to initialize
dap_stream_ctlto receive incoming requests for managing streaming sessions, as well as add handlers for receiving these requests through
dap_stream_ctl_add_proc, as well as stream handlers - directly the streaming server itself - through
dap_stream_add_proc_udp and other transport protocols. The streaming server can work both over HTTP and over UDP protocols, other possibilities will appear in the future. Within streams, data is divided into encrypted packets, the integrity of delivery of which is guaranteed by the protocol. In case of receiving incomplete data, they are accumulated in anticipation of receiving the remainder, or discarded. Also, at this level, the data sequence is taken into account to prevent channel compromise. Inside a streaming packet are streaming channel packets, so a set of logical binary data streams is established through one physical connection.
A stream session is established through a call to the handler added on the receiving side to
dap_stream_ctl_add_proc and described by the client by the
STAGE_STREAM_CTL-STAGE_STREAM_SESSION connection stages, the session, in particular, contains information about the stream data encryption algorithm, its key and other stream characteristics. It can also optionally contain information about the requested content, as well as place payment and network information in the heirs of the
dap_stream_session_t object, a session must be created before connecting directly to the stream - in the client part, the
STAGE_STREAM_STREAMING stages are responsible for this.
Within one physical connection, there are many logical - stream channels. Each streaming channel is associated with its own data type - it can be blockchain and GDB data, it can be data on networks, services, VPN traffic, audio, video, control commands and other binary data streams. Each such channel is described by an identifier of type char for the convenience of being represented by a symbol. Let's say the blockchain sync channel uses the identifier
C, the channel for the service is
R and the blockchain vpn channel of the service is
S Each such channel is created via
dap_stream_ch_proc_add where callbacks are set for the following operations: establishing a connection, terminating a connection, processing an incoming packet and preparing outgoing packets for sending. As well as for a physical connection via an event socket, a logical stream has its own read / write flags (controlled via the
dap_stream_ch_set_ready_to_write_unsafe), as well as its own set of
_mt/_unsafe functions for sending and reading data, in more detail - in the description of the
dap_stream module. the description of the structure of channel packets is located
Client connections are implemented in the
dap_client module, initialized via the
dap_client_init function. For each client connection, a
dap_client_t object is created via the
dap_client_new function, followed by the uplink address via
dap_client_set_uplink, install the certificate that will sign the keys during the primary exchange of public keys via
dap_client_set_auth_cert, the list of activated channels via
dap_client_set_states To establish a connection until the moment when the streaming connection starts, you should start the state machine with the final state
STAGE_STREAM_STREAMING - all further actions are described with the callbacks of the client creation function and the callbacks of the state machine start function. The states of the state machine are described in more detail in the description of the enumerated types
dap_client_stage_status_t - a two-level state system, when each stage has a different status. Possible state machine errors are described in