For use in Spark jobs to populate a Weaviate vector database.
You can choose one of the following options to install the Weaviate Spark Connector:
You can download the latest JAR from GitHub releases here.
To use in your own Spark job you will first need to build the fat jar of the package by running
sbt assembly
which will create the artifact in ./target/scala-2.12/spark-connector-assembly-1.2.8.jar
You can configure spark-shell or tools like spark-submit to use the JAR like this:
spark-shell --jars spark-connector-assembly-1.3.3.jar
To run on Databricks simply upload the jar file to your cluster in the libraries tab as in the below image.
After installation your cluster page should look something like this.
You can also use Maven to include the Weaviate Spark Connector as dependency in your Spark application. See here.
Running cross versions tests:
sbt -v +test
Building Scala 2.12
and Scala 2.13
binaries:
sbt +assembly
First create a schema in Weaviate as the connector will not create one automatically. See the tutorial for how to do this.
Afterwards loading data from Spark is as easy as this!
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.mode("append")
.save()
)
If you already have vectors available in your dataframe (recommended for batch insert performance) you can easily supply them with the vector option.
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.option("vector", vector_column_name)
.mode("append")
.save()
)
By default the Weaviate client will create document IDs for you for new documents but if you already have IDs you can also supply those in the dataframe.
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.option("id", id_column_name)
.mode("append")
.save()
)
For authenticated clusters such as with WCS the apiKey
option can be used. Further options including OIDC and custom headers are listed in the tutorial.
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "https")
.option("host", "demo-endpoint.weaviate.network")
.option("apiKey", WEAVIATE_API_KEY)
.option("className", "MyClass")
.option("batchSize", 100)
.option("vector", vector_column_name)
.option("id", id_column_name)
.mode("append")
.save()
)
Currently only the append write mode is supported. We do not yet support upsert or error if exists write semantics.
Both batch operations and streaming writes are supported.
The connector is able to automatically infer the correct Spark DataType based on your schema for the class in Weaviate. Your DataFrame column name needs to match the property name of your class in Weaviate. The table below shows how the connector infers the DataType:
Weaviate DataType | Spark DataType | Notes |
---|---|---|
string | StringType | |
string[] | Array[StringType] | |
int | IntegerType | Weaviate only supports int32 for now. More info here. |
int[] | Array[IntegerType] | |
boolean | BooleanType | |
boolean[] | Array[BooleanType] | |
number | DoubleType | |
number[] | Array[DoubleType] | |
date | DateType | |
date[] | Array[DateType] | |
text | StringType | |
text[] | StringType | |
geoCoordinates | StringType | |
phoneNumber | StringType | |
blob | StringType | Encode your blob as base64 string |
vector | Array[FloatType] | |
cross reference | string | Not supported yet |
Please also take a look at the Weaviate data types docs and the Spark DataType docs.
This repository uses SBT to compile the code. SBT can be installed on MacOS following the instructions here.
You will also need Java 8+ and Scala 2.12 installed. The easiest way to get everything set up is to install IntelliJ.
To compile the package simply run sbt compile
to ensure that you have everything needed to run the Spark connector.
The unit and integration tests can be run via sbt test
.
The integration tests stand up a local Weaviate instance running in docker and then run the Apache Spark code in a separate docker container. You will need to have docker running to run all tests.
sbt assembly
docker build -t spark-with-weaviate .
docker run -it spark-with-weaviate /opt/spark/bin/spark-shell
case class Article (title: String, content: String)
val articles = Seq( Article("Sam", "Sam")).toDF
articles.write.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", "localhost:8080")
.mode("append").save()