krowinski/php-mysql-replication 基于binlog的数据同步

目录

binlog 方式监听数据更新对数据进行需要的处理

https://github.com/krowinski/php-mysql-replication/blob/master/README.md

前置条件:

PHP sockets 扩展需开启 MySQL容器的binlog_format需要配置为 row

<?php

namespace App\Console\Commands\Search;

use App\Models\Redis\SearchRedis;
use Illuminate\Console\Command;
use Illuminate\Support\Arr;
use MySQLReplication\Config\ConfigBuilder;
use MySQLReplication\Event\DTO\DeleteRowsDTO;
use MySQLReplication\Event\DTO\UpdateRowsDTO;
use MySQLReplication\Event\DTO\WriteRowsDTO;
use MySQLReplication\Event\EventSubscribers;
use MySQLReplication\MySQLReplicationFactory;

class Event extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'search:event';

    /**
     * The console command description.
     *
     * @var string
     */

    protected $description = '监听MySQL事件';

    public function handle()
    {

        /**
         * Your db configuration
         * @see ConfigBuilder
         * @link https://github.com/krowinski/php-mysql-replication/blob/master/README.md
         */
        $binLogStream = new MySQLReplicationFactory(
            (new ConfigBuilder())
                ->withUser(env('DB_USERNAME'))
                ->withHost(env('DB_HOST'))
                ->withPassword(env('DB_PASSWORD'))
                ->withDatabasesOnly([env('DB_DATABASE')])
                ->withPort(3306)
                ->withSlaveId(100)
                ->withHeartbeatPeriod(2)
                ->build()
        );

        /**
         * Register your events handler
         * @see EventSubscribers
         */
        $binLogStream->registerSubscriber(
            new class() extends EventSubscribers {

                public static function getPkValue($type, $value, $pk)
                {
                    $value = array_pop($value);
                    switch ($type) {
                        case SearchRedis::EVENT_TYPE_INSERT:
                        case SearchRedis::EVENT_TYPE_DELETE:
                            return Arr::get($value, $pk);
                        case SearchRedis::EVENT_TYPE_UPDATE:
                            return Arr::get($value, "after.{$pk}");
                    }
                }

                public static function getConfig($table, $field)
                {
                    return config("search.{$table}.{$field}");
                }

                public static function setEvent($type, $event)
                {

                    $table = $event->getTableMap()->getTable();
                    if (!in_array($table, array_keys(config('search')))) {
                        echo '无需监听的表:' . $table . PHP_EOL;
                        return;
                    }

                    $event = [
                        SearchRedis::EVENT_TABLE  => self::getConfig($table, 'host'),
                        SearchRedis::EVENT_ID     => self::getPkValue($type, $value, self::getConfig($table, 'foreignKey')),
                        SearchRedis::EVENT_COLUMN => $table,
                    ];

                    switch ($type) {
                        case SearchRedis::EVENT_TYPE_INSERT:
                            $event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_INSERT;
                            break;
                        case SearchRedis::EVENT_TYPE_UPDATE:
                            $event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_UPDATE;
                            break;
                        case SearchRedis::EVENT_TYPE_DELETE:
                            $event[SearchRedis::EVENT_TYPE] = SearchRedis::EVENT_TYPE_DELETE;
                            break;
                        default:
                            break;
                    }

                    SearchRedis::setEvent($event);
                }

                public function onUpdate(UpdateRowsDTO $event): void
                {

                    self::setEvent(SearchRedis::EVENT_TYPE_UPDATE, $event);

                    echo "↑↑↑↑↑↑↑↑↑↑↑↑   UPDATE    ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;
                }

                public function onWrite(WriteRowsDTO $event): void
                {

                    self::setEvent(SearchRedis::EVENT_TYPE_INSERT, $event);

                    echo "↑↑↑↑↑↑↑↑↑↑↑↑   WRITE    ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;

                }

                public function onDelete(DeleteRowsDTO $event): void
                {
                    self::setEvent(SearchRedis::EVENT_TYPE_DELETE, $event);

                    echo "↑↑↑↑↑↑↑↑↑↑↑↑   DELETE   ↑↑↑↑↑↑↑↑↑↑↑↑↑" . PHP_EOL;

                }

            }
        );

        // start consuming events
        $binLogStream->run();

    }
}