diff --git a/server/server.conf.example b/server/server.conf.example index 282fa92..57fce5d 100644 --- a/server/server.conf.example +++ b/server/server.conf.example @@ -2,6 +2,7 @@ # The first implementation receives one active sender and plays it to the default audio output. listen_port = 4860 +web_port = 4861 sample_rate = 48000 channels = 1 frame_ms = 10 diff --git a/server/src/main.cpp b/server/src/main.cpp index 8617be1..dba5686 100644 --- a/server/src/main.cpp +++ b/server/src/main.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -9,16 +10,20 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include +#include #include +#include #include #define MINIAUDIO_IMPLEMENTATION @@ -38,6 +43,8 @@ constexpr std::uint16_t kDefaultFrameMs = 10; constexpr std::uint16_t kDefaultChannels = 1; constexpr std::uint32_t kDefaultJitterBufferMs = 120; constexpr std::uint32_t kDefaultLogIntervalPackets = 100; +constexpr std::uint16_t kDefaultWebPort = 4861; +constexpr const char* kWebHost = "127.0.0.1"; std::atomic g_running{true}; @@ -75,6 +82,7 @@ struct Config { std::uint16_t sender_id = 0; std::uint32_t jitter_buffer_ms = kDefaultJitterBufferMs; std::uint32_t log_interval_packets = kDefaultLogIntervalPackets; + std::uint16_t web_port = kDefaultWebPort; std::string audio_output = "default"; }; @@ -149,6 +157,12 @@ bool load_config_file(const std::string& path, Config& config) { throw std::runtime_error("Invalid log_interval_packets: " + value); } config.log_interval_packets = *parsed; + } else if (key == "web_port") { + const auto parsed = parse_u32(value); + if (!parsed || *parsed == 0 || *parsed > 65535) { + throw std::runtime_error("Invalid web_port: " + value); + } + config.web_port = static_cast(*parsed); } else if (key == "audio_output") { config.audio_output = value; } else { @@ -383,8 +397,111 @@ struct SenderStats { std::uint64_t packets = 0; std::uint64_t dropped = 0; std::uint64_t out_of_order = 0; + std::uint64_t last_packet_ms = 0; }; +struct SenderControl { + bool muted = false; + double gain = 1.0; + std::string name; +}; + +struct SharedState { + mutable std::mutex mutex; + std::string config_path; + Config config; + std::string udp_state = "stopped"; + std::string udp_error; + std::string audio_state = "stopped"; + std::string audio_error; + std::uint16_t active_sender_id = 0; + bool output_muted = false; + std::map senders; + std::map controls; + std::uint64_t buffered_samples = 0; + std::uint64_t overflows = 0; + std::uint64_t underflows = 0; +}; + +std::uint64_t now_ms() { + const auto now = std::chrono::system_clock::now().time_since_epoch(); + return static_cast( + std::chrono::duration_cast(now).count()); +} + +std::string json_escape(const std::string& value) { + std::ostringstream out; + for (const char ch : value) { + switch (ch) { + case '\\': + out << "\\\\"; + break; + case '"': + out << "\\\""; + break; + case '\n': + out << "\\n"; + break; + case '\r': + out << "\\r"; + break; + case '\t': + out << "\\t"; + break; + default: + out << ch; + break; + } + } + return out.str(); +} + +std::string make_status_json(const SharedState& state) { + std::lock_guard lock(state.mutex); + std::ostringstream out; + out << "{"; + out << "\"config_path\":\"" << json_escape(state.config_path) << "\","; + out << "\"listen_port\":" << state.config.listen_port << ","; + out << "\"web_host\":\"" << kWebHost << "\","; + out << "\"web_port\":" << state.config.web_port << ","; + out << "\"sample_rate\":" << state.config.sample_rate << ","; + out << "\"channels\":" << state.config.channels << ","; + out << "\"frame_ms\":" << state.config.frame_ms << ","; + out << "\"udp_state\":\"" << json_escape(state.udp_state) << "\","; + out << "\"udp_error\":\"" << json_escape(state.udp_error) << "\","; + out << "\"audio_state\":\"" << json_escape(state.audio_state) << "\","; + out << "\"audio_error\":\"" << json_escape(state.audio_error) << "\","; + out << "\"active_sender_id\":" << state.active_sender_id << ","; + out << "\"output_muted\":" << (state.output_muted ? "true" : "false") << ","; + out << "\"buffered_samples\":" << state.buffered_samples << ","; + out << "\"overflows\":" << state.overflows << ","; + out << "\"underflows\":" << state.underflows << ","; + out << "\"now_ms\":" << now_ms() << ","; + out << "\"senders\":["; + bool first = true; + for (const auto& [sender_id, stats] : state.senders) { + const auto control_it = state.controls.find(sender_id); + const SenderControl control = control_it == state.controls.end() ? SenderControl{} : control_it->second; + if (!first) { + out << ","; + } + first = false; + out << "{"; + out << "\"sender_id\":" << sender_id << ","; + out << "\"name\":\"" << json_escape(control.name) << "\","; + out << "\"muted\":" << (control.muted ? "true" : "false") << ","; + out << "\"gain\":" << control.gain << ","; + out << "\"session_id\":" << stats.session_id << ","; + out << "\"packets\":" << stats.packets << ","; + out << "\"dropped\":" << stats.dropped << ","; + out << "\"out_of_order\":" << stats.out_of_order << ","; + out << "\"last_packet_ms\":" << stats.last_packet_ms; + out << "}"; + } + out << "]}"; + return out.str(); +} + bool validate_header(const PacketHeader& header, const Config& config, std::size_t received_size) { if (header.version != kVersion || header.header_len != kHeaderSize || header.packet_type != kPacketTypeAudio) { @@ -416,6 +533,7 @@ void update_stats(SenderStats& stats, const PacketHeader& header) { stats.session_id = header.session_id; stats.expected_sequence = header.sequence + 1; stats.packets = 1; + stats.last_packet_ms = now_ms(); return; } @@ -429,6 +547,7 @@ void update_stats(SenderStats& stats, const PacketHeader& header) { } ++stats.packets; + stats.last_packet_ms = now_ms(); } std::vector decode_pcm_s16le(const std::uint8_t* payload, std::size_t payload_bytes) { @@ -453,6 +572,14 @@ int make_udp_socket(std::uint16_t listen_port) { throw std::runtime_error("setsockopt(SO_REUSEADDR) failed: " + std::string(std::strerror(errno))); } + timeval timeout{}; + timeout.tv_sec = 1; + timeout.tv_usec = 0; + if (::setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) { + ::close(fd); + throw std::runtime_error("setsockopt(SO_RCVTIMEO) failed: " + std::string(std::strerror(errno))); + } + sockaddr_in address{}; address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); @@ -485,13 +612,424 @@ private: int fd_ = -1; }; -void receive_loop(const Config& config, AudioRingBuffer& audio_buffer) { +const char* index_html() { + return R"HTML( + + + + +mic server + + + +

mic server

+
+
+
UDP
-
+
Audio
-
+
Senders
0
+
Format
-
+
+
+ + + +
+
+ + + + + + + + +
IDNameMuteGainPacketsDroppedOut of orderLast seen
+
+
No sender packets received yet.
+
+ + +)HTML"; +} + +std::string http_response(const std::string& content_type, const std::string& body, const std::string& status = "200 OK") { + std::ostringstream out; + out << "HTTP/1.1 " << status << "\r\n"; + out << "Content-Type: " << content_type << "\r\n"; + out << "Content-Length: " << body.size() << "\r\n"; + out << "Connection: close\r\n\r\n"; + out << body; + return out.str(); +} + +bool body_bool(const std::string& body, const std::string& key, bool fallback) { + const auto key_pos = body.find("\"" + key + "\""); + if (key_pos == std::string::npos) { + return fallback; + } + const auto true_pos = body.find("true", key_pos); + const auto false_pos = body.find("false", key_pos); + if (true_pos != std::string::npos && (false_pos == std::string::npos || true_pos < false_pos)) { + return true; + } + if (false_pos != std::string::npos) { + return false; + } + return fallback; +} + +double body_double(const std::string& body, const std::string& key, double fallback) { + const auto key_pos = body.find("\"" + key + "\""); + if (key_pos == std::string::npos) { + return fallback; + } + const auto colon = body.find(':', key_pos); + if (colon == std::string::npos) { + return fallback; + } + try { + return std::stod(body.substr(colon + 1)); + } catch (...) { + return fallback; + } +} + +std::string body_string(const std::string& body, const std::string& key) { + const auto key_pos = body.find("\"" + key + "\""); + if (key_pos == std::string::npos) { + return {}; + } + const auto colon = body.find(':', key_pos); + const auto first = body.find('"', colon == std::string::npos ? key_pos : colon); + if (first == std::string::npos) { + return {}; + } + const auto second = body.find('"', first + 1); + if (second == std::string::npos) { + return {}; + } + return body.substr(first + 1, second - first - 1); +} + +std::optional sender_id_from_path(const std::string& path, const std::string& suffix) { + constexpr const char* prefix = "/api/senders/"; + if (path.rfind(prefix, 0) != 0 || path.size() <= std::strlen(prefix) + suffix.size()) { + return std::nullopt; + } + if (path.substr(path.size() - suffix.size()) != suffix) { + return std::nullopt; + } + const auto id_text = path.substr(std::strlen(prefix), path.size() - std::strlen(prefix) - suffix.size()); + const auto parsed = parse_u32(id_text); + if (!parsed || *parsed == 0 || *parsed > 65535) { + return std::nullopt; + } + return static_cast(*parsed); +} + +void reset_stats(SharedState& state) { + std::lock_guard lock(state.mutex); + for (auto& [sender_id, stats] : state.senders) { + stats.packets = 0; + stats.dropped = 0; + stats.out_of_order = 0; + } +} + +std::string handle_http_request(const std::string& request, SharedState& state) { + std::istringstream stream(request); + std::string method; + std::string path; + stream >> method >> path; + const auto body_pos = request.find("\r\n\r\n"); + const std::string body = body_pos == std::string::npos ? std::string{} : request.substr(body_pos + 4); + + if (method == "GET" && (path == "/" || path == "/index.html")) { + return http_response("text/html; charset=utf-8", index_html()); + } + if (method == "GET" && path == "/api/status") { + return http_response("application/json", make_status_json(state)); + } + if (method == "POST" && path == "/api/output/mute") { + std::lock_guard lock(state.mutex); + state.output_muted = body_bool(body, "muted", state.output_muted); + return http_response("application/json", "{\"ok\":true}"); + } + if (method == "POST" && path == "/api/stats/reset") { + reset_stats(state); + return http_response("application/json", "{\"ok\":true}"); + } + if (method == "POST") { + if (const auto id = sender_id_from_path(path, "/mute")) { + std::lock_guard lock(state.mutex); + state.controls[*id].muted = body_bool(body, "muted", state.controls[*id].muted); + return http_response("application/json", "{\"ok\":true}"); + } + if (const auto id = sender_id_from_path(path, "/gain")) { + std::lock_guard lock(state.mutex); + const auto gain = std::max(0.0, std::min(4.0, body_double(body, "gain", state.controls[*id].gain))); + state.controls[*id].gain = gain; + return http_response("application/json", "{\"ok\":true}"); + } + if (const auto id = sender_id_from_path(path, "/name")) { + std::lock_guard lock(state.mutex); + state.controls[*id].name = body_string(body, "name"); + return http_response("application/json", "{\"ok\":true}"); + } + } + + return http_response("text/plain; charset=utf-8", "not found\n", "404 Not Found"); +} + +int make_web_socket(std::uint16_t web_port) { + const int fd = ::socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + throw std::runtime_error("web socket failed: " + std::string(std::strerror(errno))); + } + + int reuse = 1; + if (::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) != 0) { + ::close(fd); + throw std::runtime_error("web setsockopt(SO_REUSEADDR) failed: " + std::string(std::strerror(errno))); + } + + sockaddr_in address{}; + address.sin_family = AF_INET; + address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + address.sin_port = htons(web_port); + + if (::bind(fd, reinterpret_cast(&address), sizeof(address)) != 0) { + ::close(fd); + throw std::runtime_error("web bind failed on 127.0.0.1:" + std::to_string(web_port) + ": " + + std::string(std::strerror(errno))); + } + if (::listen(fd, 16) != 0) { + ::close(fd); + throw std::runtime_error("web listen failed: " + std::string(std::strerror(errno))); + } + + return fd; +} + +void web_loop(SharedState& state) { + FileDescriptor listener(make_web_socket(state.config.web_port)); + std::cout << "Web UI listening on http://" << kWebHost << ":" << state.config.web_port << '\n'; + + while (g_running.load()) { + fd_set read_set; + FD_ZERO(&read_set); + FD_SET(listener.get(), &read_set); + timeval timeout{}; + timeout.tv_sec = 1; + timeout.tv_usec = 0; + const int ready = ::select(listener.get() + 1, &read_set, nullptr, nullptr, &timeout); + if (ready <= 0) { + continue; + } + + sockaddr_in peer{}; + socklen_t peer_len = sizeof(peer); + const int client = ::accept(listener.get(), reinterpret_cast(&peer), &peer_len); + if (client < 0) { + if (errno == EINTR) { + continue; + } + std::cerr << "web accept failed: " << std::strerror(errno) << '\n'; + continue; + } + + char buffer[8192]; + const auto received = ::recv(client, buffer, sizeof(buffer) - 1, 0); + if (received > 0) { + buffer[received] = '\0'; + const auto response = handle_http_request(std::string(buffer, static_cast(received)), state); + const char* data = response.data(); + std::size_t remaining = response.size(); + while (remaining > 0) { + const auto sent = ::send(client, data, remaining, 0); + if (sent <= 0) { + break; + } + data += sent; + remaining -= static_cast(sent); + } + } + ::close(client); + } +} + +void update_buffer_state(SharedState& state, const AudioRingBuffer& audio_buffer) { + std::lock_guard lock(state.mutex); + state.buffered_samples = audio_buffer.filled(); + state.overflows = audio_buffer.overflow_count(); + state.underflows = audio_buffer.underflow_count(); +} + +SenderControl sender_control_for(SharedState& state, std::uint16_t sender_id) { + std::lock_guard lock(state.mutex); + return state.controls[sender_id]; +} + +bool output_muted(const SharedState& state) { + std::lock_guard lock(state.mutex); + return state.output_muted; +} + +void apply_control(std::vector& pcm, const SenderControl& control, bool output_is_muted) { + if (output_is_muted || control.muted) { + std::fill(pcm.begin(), pcm.end(), 0); + return; + } + if (control.gain == 1.0) { + return; + } + for (auto& sample : pcm) { + const auto scaled = static_cast(static_cast(sample) * control.gain); + sample = static_cast(std::max(-32768, std::min(32767, scaled))); + } +} + +void receive_loop(const Config& config, AudioRingBuffer& audio_buffer, SharedState& state) { + { + std::lock_guard lock(state.mutex); + state.udp_state = "starting"; + state.udp_error.clear(); + } + FileDescriptor socket(make_udp_socket(config.listen_port)); std::vector packet(65536); - std::map stats_by_sender; std::uint16_t active_sender_id = config.sender_id; std::cout << "Listening on UDP port " << config.listen_port << '\n'; + { + std::lock_guard lock(state.mutex); + state.udp_state = "listening"; + state.active_sender_id = active_sender_id; + } if (active_sender_id == 0) { std::cout << "Accepting the first valid sender_id as the active sender\n"; } else { @@ -507,6 +1045,10 @@ void receive_loop(const Config& config, AudioRingBuffer& audio_buffer) { if (errno == EINTR) { continue; } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + update_buffer_state(state, audio_buffer); + continue; + } throw std::runtime_error("recvfrom failed: " + std::string(std::strerror(errno))); } @@ -519,31 +1061,45 @@ void receive_loop(const Config& config, AudioRingBuffer& audio_buffer) { if (active_sender_id == 0) { active_sender_id = header->sender_id; std::cout << "Active sender_id selected: " << active_sender_id << '\n'; + std::lock_guard lock(state.mutex); + state.active_sender_id = active_sender_id; } if (header->sender_id != active_sender_id) { continue; } - auto& stats = stats_by_sender[header->sender_id]; - update_stats(stats, *header); + SenderStats stats_snapshot; + { + std::lock_guard lock(state.mutex); + auto& stats = state.senders[header->sender_id]; + update_stats(stats, *header); + stats_snapshot = stats; + } const auto* payload = packet.data() + header->header_len; auto pcm = decode_pcm_s16le(payload, header->payload_bytes); + apply_control(pcm, sender_control_for(state, header->sender_id), output_muted(state)); audio_buffer.write(pcm.data(), pcm.size()); + update_buffer_state(state, audio_buffer); - if (stats.packets % config.log_interval_packets == 0) { + if (stats_snapshot.packets % config.log_interval_packets == 0) { std::cout << "sender_id=" << header->sender_id << " session_id=" << header->session_id - << " packets=" << stats.packets - << " dropped=" << stats.dropped - << " out_of_order=" << stats.out_of_order + << " packets=" << stats_snapshot.packets + << " dropped=" << stats_snapshot.dropped + << " out_of_order=" << stats_snapshot.out_of_order << " buffered_samples=" << audio_buffer.filled() << " overflows=" << audio_buffer.overflow_count() << " underflows=" << audio_buffer.underflow_count() << '\n'; } } + + { + std::lock_guard lock(state.mutex); + state.udp_state = "stopped"; + } } } // namespace @@ -562,18 +1118,67 @@ int main(int argc, char** argv) { std::cerr << "audio_output selection is not implemented yet; using default output\n"; } + SharedState shared_state; + shared_state.config = config; + shared_state.config_path = config_path; + const auto buffer_frames = (static_cast(config.sample_rate) * config.jitter_buffer_ms) / 1000; AudioRingBuffer audio_buffer(buffer_frames * config.channels); - AudioDevice audio_device(audio_buffer, config); + std::unique_ptr audio_device; std::cout << "Config: " << config_path << '\n'; std::cout << "Audio: " << config.sample_rate << " Hz, " << config.channels << " channel(s), frame_ms=" << config.frame_ms << ", jitter_buffer_ms=" << config.jitter_buffer_ms << '\n'; - receive_loop(config, audio_buffer); + + std::thread web_thread([&shared_state]() { + try { + web_loop(shared_state); + } catch (const std::exception& error) { + std::cerr << "web error: " << error.what() << '\n'; + g_running.store(false); + } + }); + + try { + { + std::lock_guard lock(shared_state.mutex); + shared_state.audio_state = "starting"; + shared_state.audio_error.clear(); + } + audio_device = std::make_unique(audio_buffer, config); + { + std::lock_guard lock(shared_state.mutex); + shared_state.audio_state = "running"; + } + } catch (const std::exception& error) { + std::cerr << "audio error: " << error.what() << '\n'; + std::lock_guard lock(shared_state.mutex); + shared_state.audio_state = "error"; + shared_state.audio_error = error.what(); + } + + try { + receive_loop(config, audio_buffer, shared_state); + } catch (const std::exception& error) { + std::cerr << "udp error: " << error.what() << '\n'; + { + std::lock_guard lock(shared_state.mutex); + shared_state.udp_state = "error"; + shared_state.udp_error = error.what(); + } + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + } + + g_running.store(false); + if (web_thread.joinable()) { + web_thread.join(); + } } catch (const std::exception& error) { std::cerr << "mic_server error: " << error.what() << '\n'; return 1;