Stream in real-time the MySQL database events, such as CRUD and DDL. That may be useful for building Data Warehouse, integration with monolith application on the database level, etc.
MysqlBinlogStream
.rawEvents[IO](binlogClient)
.through(streamEvents[IO](transactionState))
.evalTap(msg => logger.info(s"received $msg"))
Refer to examples for complete snippet
- build docker image
sbt "project mysql-binlog-stream-examples" clean "docker:publishLocal"
- run docker compose with MySql and example service
cd mysql-binlog-stream-examples
docker-compose -f docker-compose-mysql-it-mac.yaml up
- open new console
- in new console
mysql -u 'root' --host=0.0.0.0 --port=3307 --password=''
- try to insert new row in
SKU
orVARIANT
table
use test;
insert into sku(id, sku) values(3, '123');
- in the
docker-compose
console you should see
example_1 | 02:35:58.734 [ioapp-compute-2] INFO application - received EventMessage(sku,1589596558000,create,8908ecfb63e4-bin.000007,415,true,{
example_1 | "id" : 3
example_1 | },{
example_1 | "before" : null,
example_1 | "after" : {
example_1 | "id" : 3,
example_1 | "sku" : "123"
example_1 | }
example_1 | })
try to update some row and see what happens
Create:
{
"table" : "sku",
"timestamp" : 1554754028000,
"action" : "create",
"fileName" : "acd03b3a873f-bin.000003",
"offset" : 2177,
"endOfTransaction" : false,
"pk" : {
"id" : 5
},
"row" : {
"before" : null,
"after" : {
"id" : 5,
"sku" : "sku5"
}
}
}
Update:
{
"table" : "sku",
"timestamp" : 1554754028000,
"action" : "create",
"fileName" : "acd03b3a873f-bin.000003",
"offset" : 2177,
"endOfTransaction" : false,
"pk" : {
"id" : 5
},
"row" : {
"before" : {
"id" : 5,
"sku" : "sku5"
},
"after" : {
"id" : 5,
"sku" : "new sku 5"
}
}
}
Delete:
{
"table" : "sku",
"timestamp" : 1554754028000,
"action" : "create",
"fileName" : "acd03b3a873f-bin.000003",
"offset" : 2177,
"endOfTransaction" : false,
"pk" : {
"id" : 5
},
"row" : {
"before" : {
"id" : 5,
"sku" : "new sku 5"
},
"after" : null
}
}
Truncate:
{
"table" : "sku",
"timestamp" : 1554754028000,
"action" : "truncate",
"fileName" : "acd03b3a873f-bin.000003",
"offset" : 2497,
"endOfTransaction" : true,
"pk" : null,
"row" : null
}
- Scala
- FS2 Functional Streams for Scala
- circe - json streaming encoder/decoder
- doobie - database integration layer
This project is supported by YourKit with monitoring and profiling Tools. YourKit supports open source with innovative and intelligent tools for monitoring and profiling Java and .NET applications. YourKit is the creator of YourKit Java Profiler, YourKit .NET Profiler, and YourKit YouMonitor.