目录
binlog 方式监听数据更新对数据进行需要的处理
https://github.com/krowinski/php-mysql-replication/blob/master/README.md
前置条件:
PHP sockets 扩展需开启 MySQL容器的binlog_format需要配置为 row
<?php
namespace App\Console\Commands\Search;
use App\Models\Redis\SearchRedis;
use Illuminate\Console\Command;
use Illuminate\Support\Arr;
use MySQLReplication\Config\ConfigBuilder;
use MySQLReplication\Event\DTO\DeleteRowsDTO;
use MySQLReplication\Event\DTO\UpdateRowsDTO;
use MySQLReplication\Event\DTO\WriteRowsDTO;
use MySQLReplication\Event\EventSubscribers;
use MySQLReplication\MySQLReplicationFactory;
class Event extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'search:event';
/**
* The console command description.
*
* @var string
*/
protected $description = '监听MySQL事件';
public function handle()
{
/**
* Your db configuration
* @see ConfigBuilder
* @link https://github.com/krowinski/php-mysql-replication/blob/master/README.md
*/
$binLogStream = new MySQLReplicationFactory(
(new ConfigBuilder())
->withUser(env('DB_USERNAME'))
->withHost(env('DB_HOST'))
->withPassword(env('DB_PASSWORD'))
->withDatabasesOnly([env('DB_DATABASE')])
->withPort(3306)
->withSlaveId(100)
->withHeartbeatPeriod(2)
->build()
);
/**
* Register your events handler
* @see EventSubscribers
*/
$binLogStream->registerSubscriber(
new class() extends EventSubscribers {
public static function getPkValue($type, $value, $pk)
{
$value = array_pop($value);
switch ($type) {
case SearchRedis::EVENT_TYPE_INSERT:
case SearchRedis::EVENT_TYPE_DELETE:
return Arr::get($value, $pk);
case SearchRedis::EVENT_TYPE_UPDATE:
return Arr::get($value, "after.{$pk}");
}
}
public static function getConfig($table, $field)
{
return config("search.{$table}.{$field}");
}
public static function setEvent($type, $event)
{
$table = $event->getTableMap()->getTable();
if (!in_array($table, array_keys(config('search')))) {
echo '无需监听的表:' . $table . PHP_EOL;
return;
}
$event = [
SearchRedis::EVENT_TABLE => self::getConfig($table, 'host'),
SearchRedis::EVENT_ID => self::getPkValue($type, $value, self::getConfig($table, 'foreignKey')),
SearchRedis::EVENT_COLUMN => $table,
];
switch ($type) {
case SearchRedis::EVENT_TYPE_INSERT:
$event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_INSERT;
break;
case SearchRedis::EVENT_TYPE_UPDATE:
$event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_UPDATE;
break;
case SearchRedis::EVENT_TYPE_DELETE:
$event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_DELETE;
break;
default:
break;
}
SearchRedis::setEvent($event);
}
public function onUpdate(UpdateRowsDTO $event): void
{
self::setEvent(SearchRedis::EVENT_TYPE_UPDATE, $event);
echo "↑↑↑↑↑↑↑↑↑↑↑↑ UPDATE ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;
}
public function onWrite(WriteRowsDTO $event): void
{
self::setEvent(SearchRedis::EVENT_TYPE_INSERT, $event);
echo "↑↑↑↑↑↑↑↑↑↑↑↑ WRITE ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;
}
public function onDelete(DeleteRowsDTO $event): void
{
self::setEvent(SearchRedis::EVENT_TYPE_DELETE, $event);
echo "↑↑↑↑↑↑↑↑↑↑↑↑ DELETE ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;
}
}
);
// start consuming events
$binLogStream->run();
}
}