基于 swoole 的多进程消息同步微服务 -- (1)需求介绍和系统架构设计

写在前面----该项目第一版代码源自导师---蓝天 在正常的业务需求中,总是有各种各样的目的需要解耦数据提供和使用方,在我们业务线中,需要在用户完成一个流程后较短的一段时间后形成数据报告,而数据来源有三个端,分别是C++、JAVA、JS。 在选型中,直接写入数据库存在非常高的成本和安全问题,而之前的业务流程中我们已经使用了阿里云的日志服务,调研之后发现在时效性、安全性、稳定性和快速实现等多方面都非常满足我们的需求,在约定了数据格式后,我们就采用了这个中间件的方式实现了多方数据的汇总处理、消费。 技术选型完成,那么就是技术架构,在技术栈上使用 swoole 开发,这里不得不提到阿里云日志系统在实时采集与消费(LogHub),图片来自阿里云日志服务:

而我们则需要从各个端保存日志的 Logstore 中将数据实时、稳定的拉取下来即可;而阿里云日志系统中每一个 Logstore 都采用多个 Shard 的方式保证扩展性,在官方的 demo 中清晰的看到其对一个 Logstore 的日志拉取是依次拉取各个分片的,贴下核心代码:

foreach($listShardResponse-> getShardIds()  as $shardId)
{
    #对每一个 ShardId,先获取 Cursor
    $getCursorRequest = new Aliyun_Log_Models_GetCursorRequest($project,$logstore,$shardId,null, time() - 60);
    $response = $client -> getCursor($getCursorRequest);
    $cursor = $response-> getCursor();
    $count = 100;
    while(true)
    {
        #从 cursor 开始读数据
        $batchGetDataRequest = new Aliyun_Log_Models_BatchGetLogsRequest($project,$logstore,$shardId,$count,$cursor);
        var_dump($batchGetDataRequest);
        $response = $client -> batchGetLogs($batchGetDataRequest);
        if($cursor == $response -> getNextCursor())
        {
            break;
        }
        $logGroupList = $response -> getLogGroupList();
        foreach($logGroupList as $logGroup)
        {
            print ($logGroup->getCategory());
            foreach($logGroup -> getLogsArray() as $log)
            {
                foreach($log -> getContentsArray() as $content)
                {
                    print($content-> getKey().":".$content->getValue()."t");
                }
                print("n");
            }
        }
        $cursor = $response -> getNextCursor();
    }
}

而我们在实现的过程不能直接这么写,参考其他语言(JAVA)的 SDK 也可以很清晰明了的看到常驻进程服务才是阿里云官方的推荐,在简单的取舍之后果断将其改写成常驻进程服务的形式,思考再三,架构需要完成如下内容: 1、数据实时、不丢失拉取; 2、服务需长时间稳定运行; 思考分析之后给出系统架构图如下所示:

采用 swoole server 方式实现服务器,在 master 进程中申请全局共享内存空间----hashtable、channel 等,在这之下再 fork 三个进程,分别为 LogAgent、Monitor和 Check,其作用见上图,主要进程为 Monitor 进程,其作用为: 1、监控全局所有由人工申请而来的进程是否正常工作,对他们进行心跳监控; 2、保证所有的 Consumer 进程和 Producer 进程正确运行,保证机制为重启; 3、为全局提供数据 dump 机制,保证在正常退出情况下未处理完的数据保存在本地硬盘;

Last updated