首页
关于
友情链接
推荐
悠笙の喵罐头
Search
1
Docker下中心化部署EasyTier
467 阅读
2
给Android 4.9内核添加KernelSU支持
438 阅读
3
记一次为Android 4.9内核的ROM启用erofs支持
171 阅读
4
为黑群晖迁移RR引导盘
136 阅读
5
在TrueNAS上使用Docker安装1Panel
133 阅读
Android
运维
NAS
开发
登录
Search
标签搜索
Linux
C&C++
Android
Windows
MSVC
Docker
AOSP
Kernel
服务
STL
caf/clo
Web
TrueNAS
群晖
Alist
OneDrive
1Panel
编程
EasyTier
DNS
神楽悠笙
累计撰写
13
篇文章
累计收到
2
条评论
首页
栏目
Android
运维
NAS
开发
页面
关于
友情链接
推荐
悠笙の喵罐头
搜索到
2
篇与
的结果
2025-05-19
跨平台服务编写日记 Ep.2 进程间通讯(IPC)
前情提要上一篇文章实现了统一的日志管理,这篇文章来实现进程间消息通讯,即IPC。分析Windows在Windows下,主要通过管道(Pipe)实现进程间通讯。管道又分为命名管道(Named Pipe)和匿名管道(Anonymous Pipe)。其中,匿名管道是单向的,通常在父进程和子进程之间通讯[2]。而命名管道则可以是单向管道或双工管道,并且支持一对多通讯[3]。顾名思义,识别命名管道的唯一方式是它的名称,因此两个进程只要都连接到同一个名字的命名管道即可实现通信。 我们需要实现进程间的双向通讯,因此采用命名管道。大致思路就是:进程作为伺服模式,也就是接收端启动时创建一个线程,创建一个命名管道并监听管道内消息。当管道被连接时从中读取管道内数据;当进程作为发送端启动时尝试连接到同一个名称的管道,并写入消息内容。Linux在Linux下通常使用socket进行进程间通讯。不过不同于监听端口,进程间通讯一般会选择监听一个sock文件[5],常见的服务类应用如docker daemon、mysql都是通过这种方式。 因此,大致思路如下:作为伺服模式启动的进程创建一个socket监听,并等待从中接收消息;发送端连接到socket套接字并发送消息。和上文命名管道的名称类似,socket套接字会映射一个唯一的.sock文件,发送方只要打开这个文件即可发送消息。(实际上打开方式不是常规的打开文件,而是用socket专用的打开方式[5])代码实现初始化为了实现共用一套主代码,我使用了和上一篇文章中一样的通过宏定义区分系统类型的方案,将Windows和Linux的代码分别写在service-windows.h和service-linux.h两个头文件中:#ifdef _WIN32 #include "service-windows.h" #elif defined(__linux__) #include "service-linux.h" #endif当接收端进程启动时,创建一个线程处理收信息(使用std::thread作为多线程库):thread_bind = std::thread(bind_thread_main);监听部分Windows在Windows下,只要尝试从指定名称的命名管道读取数据即可。其中,因为设置了管道为等待模式(即下文中CreateNamedPipe的第三个参数DWORD dwPipeMode中设置了PIPE_WAIT),ConnectNamedPipe会是阻塞模式,因此不用担心不断循环造成的性能损失。void bind_thread_main() { while (!exit_requested.load()) { HANDLE hPipe = CreateNamedPipe( PIPE_NAME, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 1024, // Output buffer size 1024, // Input buffer size 0, // Default timeout NULL); if (hPipe == INVALID_HANDLE_VALUE) { service_log.push(LEVEL_WARN, "Failed to create pipe: %d", GetLastError()); continue; } if (ConnectNamedPipe(hPipe, NULL) || GetLastError() == ERROR_PIPE_CONNECTED) { char buffer[1024]; DWORD bytesRead; if (ReadFile(hPipe, buffer, sizeof(buffer) - 1, &bytesRead, NULL)) { buffer[bytesRead] = '\0'; m_queueMsg.push(buffer); service_log.push(LEVEL_VERBOSE, "Message received: %s", buffer); } FlushFileBuffers(hPipe); DisconnectNamedPipe(hPipe); CloseHandle(hPipe); } else { CloseHandle(hPipe); } } }Linux为了防止创建失败,在创建前会先尝试删除没有清理干净的sock文件,即代码中unlink(SOCKET_PATH)。SOCKET_PATH为全局变量,定义了套接字文件的路径。创建套接字时,指定family为AF_UNIX代表创建UNIX套接字,即.sock文件的这种类型(如果是网络套接字就是AF_INET)。timeval这段代码设置了一个超时限制,当accept函数等待时间超过设置的SOCKET_TIMEOUT超时时间(单位秒)后会自动结束阻塞并返回错误信息。创建完套接字后按照正常流程设置绑定和监听即可。void bind_thread_main() { unlink(SOCKET_PATH); int server_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (server_fd == -1) { service_log.push(LEVEL_FATAL, "Failed to create socket"); exit_requested.store(true); return; } struct timeval tv; tv.tv_sec = SOCKET_TIMEOUT; tv.tv_usec = 0; setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); sockaddr_un addr{}; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1); if (bind(server_fd, (sockaddr*)&addr, sizeof(addr)) == -1) { service_log.push(LEVEL_FATAL, "Bind failed"); close(server_fd); exit_requested.store(true); return; } if (listen(server_fd, 5) == -1) { service_log.push(LEVEL_FATAL, "Listen failed"); close(server_fd); exit_requested.store(true); return; } while (!exit_requested.load()) { int client_fd = accept(server_fd, nullptr, nullptr); if (client_fd != -1) { char buffer[1024]; int bytes_read = read(client_fd, buffer, sizeof(buffer) - 1); if (bytes_read > 0) { buffer[bytes_read] = '\0'; m_queueMsg.push(buffer); service_log.push(LEVEL_VERBOSE, "Message received: %s", buffer); } close(client_fd); } else { if (errno == EWOULDBLOCK || errno == EAGAIN) { continue; } service_log.push(LEVEL_WARN, "Failed to accept socket connection"); } } }当读取到消息后,两份代码都会将消息保存至阻塞队列m_queueMsg中。发送部分Windows打开指定管道并写入消息内容即可:bool send_message(const std::string& msg) { if (!WaitNamedPipe(PIPE_NAME, NMPWAIT_WAIT_FOREVER)) { service_log.push(LEVEL_ERROR, "Failed to find valid pipe: %d", GetLastError()); return false; } HANDLE hPipe = CreateFile( PIPE_NAME, GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL); if (hPipe == INVALID_HANDLE_VALUE) { service_log.push(LEVEL_ERROR, "Failed to connect: %d", GetLastError()); return false; } DWORD bytesWritten; if (WriteFile(hPipe, msg.c_str(), (DWORD)msg.size(), &bytesWritten, NULL)) { service_log.push(LEVEL_VERBOSE, "Message sent: %s", msg.c_str()); CloseHandle(hPipe); return true; } else { service_log.push(LEVEL_ERROR, "Message (%s) send failed: %d", msg.c_str(),GetLastError()); CloseHandle(hPipe); return false; } }Linux同理,连接套接字,发送数据:bool send_message(const std::string& msg) { int sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock == -1) { service_log.push(LEVEL_ERROR, "Failed to create socket"); return false; } sockaddr_un addr{}; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1); if (connect(sock, (sockaddr*)&addr, sizeof(addr)) == -1) { service_log.push(LEVEL_ERROR, "Connect failed"); close(sock); return false; } if (write(sock, msg.c_str(), msg.size()) == -1) { service_log.push(LEVEL_ERROR, "Message send failed: %s", msg.c_str()); close(sock); return false; } else { service_log.push(LEVEL_VERBOSE, "Message sent success: %s", msg.c_str()); close(sock); return true; } }清理Windows下没什么需要清理的,Linux下删除套接字文件即可:unlink(SOCKET_PATH);效果示意图WindowsLinux样例代码下载:IPCTest.zip参考文章:https://learn.microsoft.com/zh-cn/windows/win32/ipc/pipeshttps://learn.microsoft.com/zh-cn/windows/win32/ipc/anonymous-pipeshttps://learn.microsoft.com/zh-cn/windows/win32/ipc/named-pipeshttps://www.cnblogs.com/alantu2018/p/8493809.htmlhttps://blog.csdn.net/dog250/article/details/100998838
2025年05月19日
26 阅读
0 评论
0 点赞
2025-04-19
跨平台服务编写日记 Ep.1 统一的日志管理
前阵子心血来潮,想为在使用的一个跨平台的控制台服务类应用程序编写一个自己的管理程序,加一些功能。因而设计了一套简单的服务运行流程。{alert type="warning"}本系列文章中的观点和方案为我根据自己已有的知识储备结合DeepSeek的帮助所归纳设计,并未经过严格测试,并不保证在生产环境中使用的可行性和稳定性{/alert}大致思路大致分为个线程,分别用作:日志记录目标应用实例管理,可能不止一个线程监听IPC消息处理IPC收到的消息(主进程)本文着重讨论的是日志记录部分。编写思路为什么要给日志记录单开一个线程,个人考虑是因为本身就是多线程的架构,需要编写一个统一的日志记录模块。如果每个线程单独打印,则很有可能出现两个线程同时写入文件或者同时输出到控制台,造成日志混乱。 因此,日志记录大致思路就是:定义一个队列,存储日志内容和等级创建一个线程,不断地从线程中取出元素,根据设定的日志等级决定是否打印到控制台或者输出到文件外部push日志内容到队列中一些细节上的内容保证可移植性,尽量使用STL库编写,如使用std::thread而不是pthread保证线程安全,需要使用互斥锁之类的保护相应变量让日志队列为空时线程等待,想到编写一个类似于Java下BlockingQueue的阻塞队列指定一个日志等级,超过这个等级的日志才会被保存或者打印通过va_list实现不定参数,使日志记录有sprintf的的使用体验开始编写有了上述思路,整体编写就很简单了。BlockingQueue偷懒了,这部分直接让DeepSeek写的为了实现一个多线程安全的阻塞队列,当队列为空时调用front()会阻塞直到其他线程添加元素,我们可以结合互斥锁(std::mutex)和条件变量(std::condition_variable)来同步线程操作。代码实现互斥锁(std::mutex)所有对队列的操作(push、front、pop、empty)都需要先获取锁,确保同一时间只有一个线程能修改队列,避免数据竞争。条件变量(std::condition_variable)当调用front()时,如果队列为空,线程会通过cv_.wait()释放锁并阻塞,直到其他线程调用push()添加元素后,通过cv_.notify_one()唤醒一个等待线程。cv_.wait()需配合std::unique_lock,并在等待时自动释放锁,避免死锁。使用谓词检查([this] { return !queue_.empty(); })防止虚假唤醒。元素获取与移除front()返回队首元素的拷贝(而非引用),确保调用者获得数据时队列的锁已释放,避免悬空引用。pop()需显式调用以移除元素,确保队列状态可控。#include <queue> // 队列 #include <mutex> // 互斥锁 #include <condition_variable> // 条件变量 template<typename T> class BlockingQueue { public: // 向队列中添加元素 void push(const T& item) { std::lock_guard<std::mutex> lock(mtx_); queue_.push(item); cv_.notify_one(); // 通知一个等待的线程 } // 获取队首元素(阻塞直到队列非空) T front() { std::unique_lock<std::mutex> lock(mtx_); cv_.wait(lock, [this] { return !queue_.empty(); }); // 阻塞直到队列非空 return queue_.front(); } // 获取队首元素并移除 T take() { std::unique_lock<std::mutex> lock(mtx_); cv_.wait(lock, [this] { return !queue_.empty(); }); T item = std::move(queue_.front()); // 移动语义避免拷贝 queue_.pop(); return item; } // 移除队首元素(需外部调用,非阻塞) void pop() { std::lock_guard<std::mutex> lock(mtx_); if (!queue_.empty()) { queue_.pop(); } } // 检查队列是否为空 bool empty() const { std::lock_guard<std::mutex> lock(mtx_); return queue_.empty(); } private: mutable std::mutex mtx_; // 互斥锁 std::condition_variable cv_; // 条件变量 std::queue<T> queue_; // 内部队列 };Log类Log.h#pragma once #include <iostream> #include <fstream> #include <cstring> #include <thread> #include <chrono> #include <mutex> #include <cstdio> #include <cstdarg> #include <atomic> #include "BlockingQueue.h" enum LogLevel { LEVEL_VERBOSE,LEVEL_INFO,LEVEL_WARN,LEVEL_ERROR,LEVEL_FATAL,LEVEL_OFF }; struct LogMsg { short m_LogLevel; std::string m_strTimestamp; std::string m_strLogMsg; }; class Log { private: std::ofstream m_ofLogFile; // 日志文件输出流 std::mutex m_lockFile; // 文件操作互斥锁 std::thread m_threadMain; // 后台日志处理线程 BlockingQueue<LogMsg> m_msgQueue; // 线程安全阻塞队列 short m_levelLog, m_levelPrint; // 文件和控制台日志级别阈值 std::atomic<bool> m_exit_requested{ false }; // 线程退出标志 std::string getTime(); // 获取当前时间戳 std::string level2str(short level, bool character_only); // 级别转字符串 void logThread(); // 后台线程函数 public: Log(short default_loglevel = LEVEL_WARN, short default_printlevel = LEVEL_INFO); ~Log(); void push(short level, const char* msg, ...); // 添加日志(支持格式化) void set_level(short loglevel, short printlevel); // 设置日志级别 bool open(std::string filename); // 打开日志文件 bool close(); // 关闭日志文件 }; Log.cpp#include "Log.h" std::string Log::getTime() { using sc = std::chrono::system_clock; std::time_t t = sc::to_time_t(sc::now()); char buf[20]; #ifdef _WIN32 std::tm timeinfo; localtime_s(&timeinfo,&t); sprintf_s(buf, "%04d.%02d.%02d-%02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec ); #else strftime(buf, 20, "%Y.%m.%d-%H:%M:%S", localtime(&t)); #endif return buf; } std::string Log::level2str(short level, bool character_only) { switch (level) { case LEVEL_VERBOSE: return character_only ? "V" : "Verbose"; case LEVEL_WARN: return character_only ? "W" : "Warning"; case LEVEL_ERROR: return character_only ? "E" : "Error"; case LEVEL_FATAL: return character_only ? "F" : "Fatal"; } return character_only ? "I" : "Info"; } void Log::logThread() { while (true) { LogMsg front = m_msgQueue.take(); // 阻塞直到有消息 // 处理文件写入 if (front.m_LogLevel >= m_levelLog) { std::lock_guard<std::mutex> lock(m_lockFile); // RAII 管理锁 if (m_ofLogFile) { m_ofLogFile << front.m_strTimestamp << ' ' << level2str(front.m_LogLevel, true) << ": " << front.m_strLogMsg << std::endl; } } // 处理控制台打印 if (front.m_LogLevel >= m_levelPrint) { printf("%s %s: %s\n", front.m_strTimestamp.c_str(), level2str(front.m_LogLevel, true).c_str(), front.m_strLogMsg.c_str()); } // 检查退出条件:队列为空且标志为真 if (m_exit_requested.load() && m_msgQueue.empty()) break; } return; } Log::Log(short default_loglevel, short default_printlevel) { set_level(default_loglevel, default_printlevel); m_threadMain = std::thread(&Log::logThread, this); } Log::~Log() { m_exit_requested.store(true); m_msgQueue.push({ LEVEL_INFO, getTime(), "Exit." }); // 唤醒可能阻塞的线程 if (m_threadMain.joinable()) m_threadMain.join(); close(); // 确保文件关闭 } void Log::push(short level, const char* msg, ...) { va_list args; va_start(args, msg); const int len = vsnprintf(nullptr, 0, msg, args); va_end(args); if (len < 0) return; std::vector<char> buf(len + 1); va_start(args, msg); vsnprintf(buf.data(), buf.size(), msg, args); va_end(args); m_msgQueue.push({level,getTime(),buf.data()}); } void Log::set_level(short loglevel, short printlevel) { m_levelLog = loglevel; m_levelPrint = printlevel; } bool Log::open(std::string filename) { m_lockFile.lock(); m_ofLogFile.open(filename.c_str(), std::ios::out); m_lockFile.unlock(); return (bool)m_ofLogFile; } bool Log::close() { m_lockFile.lock(); m_ofLogFile.close(); m_lockFile.unlock(); return false; }说明类/结构说明LogLevel 枚举定义日志级别:VERBOSE, INFO, WARN, ERROR, FATAL, OFF。OFF不应被用于记录的日志等级,仅用于当需要关闭日志记录时将阈值设置为该项以实现所有日志都不记录LogMsg 结构体封装日志消息:m_LogLevel:日志级别。m_strTimestamp:时间戳字符串。m_strLogMsg:日志内容。成员变量说明变量说明m_ofLogFile文件输出流,用于写入日志文件。m_lockFile互斥锁,保护文件操作。m_threadMain后台线程,处理日志消息的消费。m_msgQueue阻塞队列,存储待处理的日志消息。m_levelLog写入文件的最低日志级别(高于此级别的消息会被记录)。m_levelPrint打印到控制台的最低日志级别。m_exit_requested原子标志,控制日志线程退出。函数说明函数说明getTime获取当前时间戳字符串(跨平台实现)。level2str将日志级别转换为字符串(如 LEVEL_INFO → "I" 或 "Info")。logThread后台线程函数:消费队列消息,写入文件或打印。构造函数初始化日志级别,启动后台线程。析构函数设置退出标志,等待线程结束,确保处理剩余消息。push格式化日志消息(支持可变参数)并推入队列。set_level动态设置日志级别和打印级别。open/close打开/关闭日志文件。完整代码及测试样例下载:demo.zip
2025年04月19日
27 阅读
0 评论
0 点赞