<?php
use mokuyu\Cache;
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use model\Company;
$loader = require __DIR__ . '/vendor/autoload.php';
$dbConfig = require_once __DIR__ . '/config.php';
$redisConfig = require_once __DIR__ . '/redis.php';
$max = 50;
$dbQueue = [];
for ($i = 0; $i <= $max; $i++) {
$dbConfig['uuid'] = 'connection' . $i;
$obj = new Company($dbConfig);
$obj->setCache(new Cache($redisConfig));
$dbQueue[] = $obj;
}
run(function () use (&$dbQueue, $max) {
while (true) {
Swoole\Coroutine::sleep(1);
if (!$dbQueue) {
echo 'not connection!', PHP_EOL;
continue;
}
$mod = array_pop($dbQueue);
$datas = $mod->where('is_parse_address', 0)
->limit($max)
->order('id asc')
->select();
array_push($dbQueue, $mod);
if (!$datas) {
echo 'end!', PHP_EOL;
break;
}
foreach ($datas as $data) {
if (!$dbQueue) {
continue;
}
$temCon = array_pop($dbQueue);
Coroutine::create(function () use ($temCon, $data, &$dbQueue) {
$updateData = [];
$failData = [];
// $dbConfig['uuid'] = 'connection' . $key;
// $mod = new Company($dbConfig);
$temCon->updateAddress($data, $updateData, $failData);
if ($updateData) {
$res = $temCon->save($updateData);
echo date('Y-m-d H:i:s '), $res ? 'success' : 'fail', ': Update: ', count($updateData), ', latest Id: ', end($updateData)['id'], PHP_EOL;
}
if ($failData) {
$res = $temCon->save($failData);
echo date('Y-m-d H:i:s '), $res ? 'success' : 'fail', ': Skip: ', count($failData), ', latest Id: ', end($failData)['id'], PHP_EOL;
}
array_push($dbQueue, $temCon);
});
}
}
});
echo 1;//可以得到执行