基于 swoole 的多进程消息同步微服务 -- (2)阿里云日志拉取实现与数据 check

基于之前的架构设计,本文旨在实现阿里云的日志拉取和数据check机制,日志落地有很多种选择,这里选择为 mysql 数据库。 首先,logstore 的拉取可以参考阿里云官方的SDK,这里贴下其github地址官网快速入门地址,据此我实现的主要代码如下:

    function getLogs($startTime , $endTime, $logstore) {
        $client = new Aliyun_Log_Client($this->endpoint, $this->accessKeyId, $this->accessKey);

        $listShardRequest = new Aliyun_Log_Models_ListShardsRequest($this->project,$logstore);
        $listShardResponse = $client -> listShards($listShardRequest);
        $shardArr = $listShardResponse-> getShardIds();
        foreach($shardArr  as $shardId)
        {
            #对每一个 ShardId,先获取 Cursor  1511270484
            $getCursorRequest = new Aliyun_Log_Models_GetCursorRequest($this->project,$logstore,$shardId,null, $startTime);
            $response = $client -> getCursor($getCursorRequest);
            $cursor = $response-> getCursor();
            $count = 100;
            while(true)
            {
                #从 cursor 开始读数据
                $batchGetDataRequest = new Aliyun_Log_Models_BatchGetLogsRequest($this->project,$logstore,$shardId,$count,$cursor);
                $response = $client -> batchGetLogs($batchGetDataRequest);
                if($cursor == $response -> getNextCursor())
                {
                    break;
                }

                if(!$this->handleOneArr($response,$endTime,$logstore)) {
                    break;
                }

                $cursor = $response -> getNextCursor();
            }
        }
    }

主要的内容即根据当前传入的时间戳初始化光标,根据光标和单次请求最大拉取日志条数拉取数据,达到shard底部或人为设置的阈值即停止拉取,底层实现即curl,这里不关注底层实现细节。

时间戳的光标初始化:这里可能有些痛苦,在使用 kafka 时我们知道其支持 uuid ,这样我们只需要记录下 uuid 就可以准确的记录每一条数据是否成功被消费,而使用时间戳好处在于易于记录和可读性,在准确性上就要相差很多,不过这个问题不影响正常使用,这里提示下不要踩坑。

为了实现一个测试用例,我们还需要自己实现一个日志上传,因为技术栈支持,所以我们简单的实现了一个swoole 多进程日志上传,日志上传基本逻辑如下:起两个进程,每个进程分别向阿里云日志系统推送日志流,一个推送连续奇数,一个推送连续偶数;每隔300ms推送一条数据;这样设计有两个好处,一来可以简单的模拟一秒内多条数据出现时日志拉取是否会出现错误,二来可以通过检测日志消费之后数字是否连续判断是否出现丢数据情况;基于这个逻辑,我设计了如下的日志上传,主要实现如下:

    function pushToAliyun ($start,$checkLogFile,$sleepTime = 500) {
        pcntl_signal(SIGTERM, array($this, "killAllProcess"));
        $newStart = $start;
        $endpoint = 'cn-beijing.log.aliyuncs.com'; // 选择与上面步骤创建 project 所属区域匹配的 Endpoint
        $accessKeyId = '3cP7PB2BwPBM8gck';        // 使用你的阿里云访问秘钥 AccessKeyId
        $accessKey = 'KgIQwhPnunasNEKx3BTEeCIlfLGvgE';             // 使用你的阿里云访问秘钥 AccessKeySecret
        $project = 'shuangshi';                  // 上面步骤创建的项目名称
        $logstore = 'doubleteacher_analytics_pulllog_test';                // 上面步骤创建的日志库名称
        $client = new Aliyun_Log_Client($endpoint, $accessKeyId, $accessKey);

        // todo while 循环

        for ($i = 0 ; $i < 200000; ++ $i) {
            $newStart = $newStart + 2;
            $logInfo = [
                'event_name'=> "ALIYUN_TEST",
                "event_time"=> time(),
                "event_msec"=> rand(0,1000),
                "index"=> $newStart,
            ];
            #写入日志
            $topic = "";
            $source = "";
            $logitems = array();

            $logItem = new Aliyun_Log_Models_LogItem();
            $logItem->setTime(time().'123');
            $logItem->setContents($logInfo);
            array_push($logitems, $logItem);

            $req2 = new Aliyun_Log_Models_PutLogsRequest($project, $logstore, $topic, $source, $logitems);
            $client->putLogs($req2);
            usleep($sleepTime);
        }

        // 一分钟之后,结束 process checklog 进程并自动退出
        sleep(60);
        $checkLogPid = file_get_contents($checkLogFile);
        exec("kill $checkLogPid");
        exit;

    }

实现了日志上传逻辑之后,在拉取逻辑中直接实现日志落地到 mysql 数据库; 在测试用例中我们只需要启动一个全新的进程,使其专门检测指定数据表中在指定的开始和结束时间段内数据是否连续,同时记录下处理的总条数,最后只需要验证总条数相等且数据连续即可证明服务正常。 在正式的业务场景中,我们运行了一个多进程日志拉取服务,然而却不应该在该服务中再启动一个进程用于数据检测,主要原因在于 check 机制需要稳定运行,其本身的意义在于发现并检测日志拉取服务是否出现问题,不应该将其与服务放在一起管理,所以这里我们选择在 crontab 上实现一个定时任务,拉取当前时间到一个小时之前的所有数据根据业务逻辑过滤之后再与数据库对比即可。

Last updated