imrafaelmerino / vertx-mongo-effect   1.0.0-RC4

GitHub

Vertx-MongoDB-Effect enables interaction with MongoDB in a purely functional and reactive style, seamlessly integrating with Vertx-effect. This library requires familiarity with Vertx-effect, as both frameworks share a foundational reliance on immutability and persistent data structures provided by json-values.

vertx-mongodb-effect

Buy Me a Coffee

Maven

Introduction

vertx-mongodb-effect allows us to work with MongoDB following a purely functional and reactive style. It requires to be familiar with vertx-effect. Both vertx-effect and vertx-mongo-effect use the immutable and persistent Json from json-values. Jsons travel across the event bus, from verticle to verticle, back and forth, without being neither copied nor converted to BSON. The vertx codecs to send the Json from json-values to the event bus are in vertx-values, which is a dependency of vertx-effect.

Supported types

json-values supports the standard Json types: string, number, null, object, array; There are five number specializations: int, long, double, decimal, and BigInteger. json-values adds support for instants and binary data. It serializes Instants into its string representation according to ISO-8601, and the binary type into a string encoded in base 64.

vertx-mongodb-effect uses mongo-values. It abstracts the processes of encoding to BSON and decoding from BSON. Please find below the BSON types supported and their equivalent types in json-values.


Map<BsonType, Class<?>> map = new HashMap<>();
map.put(BsonType.NULL, JsNull.class);
map.put(BsonType.ARRAY, JsArray.class);
map.put(BsonType.BINARY, JsBinary.class);
map.put(BsonType.BOOLEAN, JsBool.class);
map.put(BsonType.DATE_TIME, JsInstant.class);
map.put(BsonType.DOCUMENT, JsObj.class);
map.put(BsonType.DOUBLE, JsDouble.class);
map.put(BsonType.INT32, JsInt.class);
map.put(BsonType.INT64, JsLong.class);
map.put(BsonType.DECIMAL128, JsBigDec.class);
map.put(BsonType.STRING, JsStr.class);

When defining the mongodb settings, you have to specify the codec registry JsValuesRegistry from mongo-values:

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import mongovalues.JsValuesRegistry;

MongoClientSettings  settings =
             MongoClientSettings.builder()
                                .applyConnectionString(connString)
                                .codecRegistry(JsValuesRegistry.INSTANCE)
                                .build();

Supported operations

Every method of the MongoDB driver has an associated lambda.

Since vertx-mongodb-effect uses the driver API directly, it can benefit from all its features and methods. It's an advantage over the official vertx-mongodb-client.

Please find below the types and constructors of the most essentials operations:

Count :: Lambdac<JsObj, Long>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.CountOptions;

public Count(Supplier<MongoCollection<JsObj>> collectionSupplier,
             CountOptions options
            )

DeleteMany :: Lambdac<JsObj, O>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.result.DeleteResult;

public DeleteMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<DeleteResult, O> resultConverter,
                  DeleteOptions options 
                 )
                      

DeleteOne :: Lambdac<JsObj, O>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.result.DeleteResult;

public DeleteOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<DeleteResult, O> resultConverter,
                 DeleteOptions options 
                )
                      

FindAll :: Lambdac<FindMessage, JsArray>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.FindIterable;

public FindAll(Supplier<MongoCollection<JsObj>> collectionSupplier,
               Function<FindIterable<JsObj>, JsArray> converter 
              )

FindOne :: Lambdac<FindMessage, JsObj>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.FindIterable;

public FindOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
               Function<FindIterable<JsObj>, JsObj> converter 
              )                 

FindOneAndDelete :: Lambdac<JsObj, JsObj>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndDeleteOptions;

public FindOneAndDelete(Supplier<MongoCollection<JsObj>> collectionSupplier,
                        FindOneAndDeleteOptions options
                       ) 

FindOneAndReplace :: Lambdac<UpdateMessage, JsObj>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndReplaceOptions;

public FindOneAndReplace(Supplier<MongoCollection<JsObj>> collectionSupplier,
                         FindOneAndReplaceOptions options
                        )   

FindOneAndUpdate :: Lambdac<UpdateMessage, JsObj>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndUpdateOptions;

public FindOneAndUpdate(Supplier<MongoCollection<JsObj>> collectionSupplier,
                        FindOneAndUpdateOptions options
                       )

InsertMany :: Lambdac<JsArray, R>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.model.InsertManyOptions;

public InsertMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<InsertManyResult, R> resultConverter,
                  InsertManyOptions options
                 )   

InsertOne :: Lambdac<JsObj, R>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.InsertOneOptions;

public InsertOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<InsertOneResult, R> resultConverter,
                 InsertOneOptions options
                )    

ReplaceOne :: Lambdac<UpdateMessage, O>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.ReplaceOptions;

public ReplaceOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<UpdateResult, O> resultConverter,
                  ReplaceOptions options
                 )   

UpdateMany :: Lambdac<UpdateMessage, O>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.UpdateOptions

public UpdateMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<UpdateResult, O> resultConverter,
                  UpdateOptions options
                 )

UpdateOne :: Lambdac<UpdateMessage, O>

import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.UpdateOptions

public UpdateOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<UpdateResult, O> resultConverter,
                 UpdateOptions options
                )

Defining modules

Like with vertx-effect, modules deploys verticles and expose lambdas to communicate with them. The typical scenario is to create a module per collection. We can deploy or spawn verticles.

The following modules are just a couple of examples.

We create a module where all the lambdas make read operations and spawn verticles to reach a significant level of parallelization:

import vertx.mongodb.effect.MongoModule;
import vertx.effect.Lambdac;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.ThreadingModel;

public class ReadModule extends MongoModule {

    public MyCollectionModule(final Supplier<MongoCollection<JsObj>> collection) {
        super(collection,
              new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
             );
    }

    public static Lambdac<FindMessage, Optional<JsObj>> findOne;
    public static Lambdac<FindMessage, JsArray> findAll;
    public static Lambdac<JsObj, Long> count;
    public static Lambdac<JsArray, JsArray> aggregate;

    @Override
    protected void deploy() {}  

    @Override
    protected void initialize() {
        Lambdac<FindMessage, JsObj> findOneLambda = vertxRef.spawn("find_one",
                                                                   new FindOne(collection)
                                                                  );
        this.findOne = (context,message) -> findOneLambda.apply(context,message)
                                                         .map(Optional::ofNullable);
        this.findAll = vertxRef.spawn("find_all",
                                      new FindAll(collection)
                                     );
        this.count = vertxRef.spawn("count",
                                    new Count(collection)
                                   );

        this.aggregate = vertxRef.spawn("aggregate",
                                        new Aggregate<>(collection,
                                                        Converters.aggregateResult2JsArray
                                                       )
                                       );
    }
}

We create a module where all the lambdas make delete, insert and update operations, and deploy only one instance per verticle.

import vertx.mongodb.effect.MongoModule;
import vertx.effect.Lambdac;

public class MyCollectionModule extends MongoModule {

    public MyCollectionModule(final Supplier<MongoCollection<JsObj>> collection) {
        super(collection);
    }

    public static Lambdac<JsObj, String> insertOne;
    public static Lambdac<JsObj, JsObj> deleteOne;
    public static Lambdac<UpdateMessage, JsObj> replaceOne;
    public static Lambdac<UpdateMessage, JsObj> updateOne;

    @Override
    protected void deploy() {
        this.deploy(INSERT_ONE_ADDRESS,
                    new InsertOne<>(collection,
                                    Converters.insertOneResult2HexId
                                   ),
                   );
        this.deploy(DELETE_ONE_ADDRESS,
                    new DeleteOne<>(collection,
                                    Converters.deleteResult2JsObj
                                   )
                   );

        this.deploy(REPLACE_ONE_ADDRESS,
                    new ReplaceOne<>(collection,
                                     Converters.updateResult2JsObj
                                    )
                   );
        this.deploy(UPDATE_ONE_ADDRESS,
                    new UpdateOne<>(collection,
                                    Converters.updateResult2JsObj
                                   )
                   );
    }  

    @Override
    protected void initialize() {
        this.insertOne = this.trace(INSERT_ONE_ADDRESS);
        this.deleteOne = this.trace(DELETE_ONE_ADDRESS);
        this.replaceOne = this.trace(REPLACE_ONE_ADDRESS);
        this.updateOne = this.trace(UPDATE_ONE_ADDRESS);
    }

    private static final String DELETE_ONE_ADDRESS = "delete_one";   
    private static final String UPDATE_ONE_ADDRESS = "update_one";
    private static final String REPLACE_ONE_ADDRESS = "replace_one";
    private static final String INSERT_ONE_ADDRESS = "insert_one";
    private static final String INSERT_MANY_ADDRESS = "insert_all";
    private static final String DELETE_MANY_ADDRESS = "delete_all";

}

Deploying modules

The verticles RegisterMongoEffectCodecs and RegisterJsValuesCodecs need to be deployed to register the vertx message codecs. Remember that you can't send any message to the event bus. If a message is not supported by Vertx you have to create a MessageCodec.

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import vertx.mongodb.effect.MongoVertxClient;
import vertx.effect.VertxRef;

// define every timeout if you wanna be reactive
int connectTimeoutMS =  ???;
int socketTimeoutMS = ???;
int serverSelectionTimeoutMS = ???;

String connectionUrl = 
     String.format("mongodb://localhost:27017/?connectTimeoutMS=%s&socketTimeoutMS=%s&serverSelectionTimeoutMS=%s",
                   connectTimeoutMS,
                   socketTimeoutMS,
                   serverSelectionTimeoutMS 
                  );
 
ConnectionString connString = new ConnectionString(connectionUrl);

MongoClientSettings  settings =
             MongoClientSettings.builder()
                                .applyConnectionString(connString)
                                .codecRegistry(JsValuesRegistry.INSTANCE)
                                .build();

// one vertx client per database connection 
MongoVertxClient mongoClient = new MongoVertxClient(settings);

String database = ???;
String collection = ???; 
MyCollectionModule collectionModule = 
          new MyCollectionModule(mongoClient.getCollection(database,
                                                           collection
                                                          )
                                );

VertxRef vertxRef = new VertxRef(vertx);

Quadruple.sequential(vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                     vertxRef.deployVerticle(new RegisterMongoEffectCodecs()),
                     vertxRef.deployVerticle(mongoClient),
                     vertxRef.deployVerticle(collectionModule)
                     ) 
         .get();

Once everything is up and running, enjoy your lambdas!


BiFunction<Integer,String,Val<Optional<JsObj>>> findByCode = (attempts,code) ->
          MyCollectionModule.findOne
                            .apply(FindMessage.ofFilter(JsObj.of("code",
                                                                 JsStr.of(code)
                                                                )
                                                        ) 
                                   )                    
                            .retry(e -> Failures.anyOf(MONGO_CONNECT_TIMEOUT_CODE,
                                                       MONGO_READ_TIMEOUT_CODE
                                                      ),
                                   attempts
                                  )
                            .recoverWith(e -> Val.succeed(Optional.empty()));

Publishing events

Since vertx-effect publishes the most critical events into the address vertx-effect-events, it' possible to register consumers to explode that information. You can disable this feature with the Java system property -Dpublish.events=false. Thanks to Lambdac, it's possible to correlate different events that belongs to the same transaction. Go to the vertx-effect doc for further details.

JFR support

Since vertx-effect supports JFR, all the verticle messages have an associated event and can be visualized using Java Mission Control.

Fields of a verticle message event:

- address: Address of the Verticle where the message is sent to
- result: SUCCESS OR FAILURE, dependening on what the caller receives
- failure code: In case the failure is a ReplyException, it's the failure code
- failure type: In case the failure is a ReplyException, it's the failure type
- failure message: In case the failure is a ReplyException, it's the failure message
- exception class: In case the failure is not a ReplyException, it's the exception class name
- exception message: In case the failure is not a ReplyException, it's the exception message
- duration: time since the message is sent until the response is received

Requirements

Installation

For Java 17 or higher:

<dependency>
   <groupId>com.github.imrafaelmerino</groupId>
   <artifactId>vertx-mongodb-effect</artifactId>
   <version>2.0.0</version>
</dependency>

For Java 21 or higher:

<dependency>
  <groupId>com.github.imrafaelmerino</groupId>
  <artifactId>vertx-mongodb-effect</artifactId>
  <version>3.0.0</version>
</dependency>

Running Integration Tests

Before executing the integration tests, ensure that a Mongo replica set is up and running.

#Edit this path to your particular case
PROJECT_HOME="/Users/rmerino/Projects/vertx-mongodb-effect"

CONFIGURATION_FILE_PATH="${PROJECT_HOME}/src/test/resources/conf.yml"

if [ ! -f "$CONFIGURATION_FILE_PATH" ]; then
    echo "The configuration file ${CONFIGURATION_FILE_PATH} does not exist."
fi

docker run -d -p 27017:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo1 mongo --config "/etc/conf.yml"

docker run -d -p 27018:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo2 mongo --config "/etc/conf.yml"

docker run -d -p 27019:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo3 mongo --config "/etc/conf.yml" 


#let's configure the replica opening the Mongo console
docker exec -it mongo1 mongosh

and execute the following command, where 192.168.1.64 is my IP (don't use localhost):

rs.initiate(
       {
         _id: "rs0",
         members: [
           { _id: 0, host: "192.168.1.64:27017" },
           { _id: 1, host: "192.168.1.64:27018" },
           { _id: 2, host: "192.168.1.64:27019" }
         ]
       }
     )

Once the Mongo replica set is up and running, you're ready to run the integration tests.

To run the tests, use one of the following Maven commands:

  • mvn failsafe:integration-test — To run integration tests only.
  • mvn verify — To run both unit and integration tests.