ElasticIndexer4s is a Library that helps you write to elasticsearch from your scala application. The main purpose is to spare you from writing your own implementation for streaming data to elasticsearch and taking care of switching alias and deleting old indicies. It builds heavily on elastic4s
At work we had multiple scala jobs running to write elasticseach indices. These jobs were running on a
nightly basis and each one had its own implementation and follow up for switching to the new alias and deleting old indices.
So this is a perfect place to generify this task. This library does exactly that for your, write to elasticsearch and
afterwards optionally switch the alias to the newly written index and delete old versions of the index.
The only thing you have to provide are some settings, an akka.stream.scaladsl.Source[A]
and a com.sksamuel.elastic4s.Indexable[A]
.
ElasticIndexer4s is currently available for Scala 2.12, if you need it for scala 2.11 open an issue.
To get started with SBT, simply add the following to your build.sbt
file:
libraryDependencies += "io.github.yannick-cw" % "elastic_indexer4s_2.12" % "0.6.4"
First you need a Configuration:
import com.yannick_cw.elastic_indexer4s.elasticsearch.elasic_config.ElasticWriteConfig
import com.sksamuel.elastic4s.http.ElasticNodeEndpoint
// basic configuration, without mapping, settings or analyzers
// will create an index with test_index_<current_date>
val config = ElasticWriteConfig(
esNodeEndpoints = List(ElasticNodeEndpoint("http", "localhost", 9200, None)),
esTargetIndexPrefix = "test_index",
esTargetType = "documents"
)
Then you'll need a akka Source of data with an Indexable
, in this case using circe to transform to json:
import akka.stream.scaladsl.Source
import com.sksamuel.elastic4s.Indexable
import io.circe.generic.auto._
import io.circe.syntax._
case class Document(s: String)
object Document {
implicit val indexable = new Indexable[Document] {
override def json(t: Document): String = t.asJson.noSpaces
}
}
val dummySource = Source.single(Document("testDoc"))
In the last step you connect the Source and run the Indexer with optional steps:
import com.yannick_cw.elastic_indexer4s.ElasticIndexer4s
val runResult: Future[Either[IndexError, RunResult]] = ElasticIndexer4s(config)
.from(dummySource)
.switchAliasFrom(alias = "alias", minT = 0.95, maxT = 1.25)
.deleteOldIndices(keep = 2, aliasProtection = true)
.run
Your result will either be a RunResult
if all went fine or an IndexError
, with detailed information on what
steps succeeded and what failed.
The switching and deleting are optional.
config name | meaning |
---|---|
elasticNodeEndpoints: List[ElasticNodeEndpoint] |
all elasticsearch nodes to connect to |
indexPrefix: String |
the prefix that will be used in the index name, a date will be added |
docType: String |
name for the type of documents to be writte to elasticsearch, will be used as elasticsearch type |
mappingSetting: MappingSetting |
all mappings and settings, see below for more details |
writeBatchSize: Int |
the number of documents written in one batch, default to 50 |
writeConcurrentRequest: Int |
the number of parallel request to elasticsearch, defaults to 10 |
writeMaxAttempts: Int |
the retry attempts to write before failure, defaults to 5 |
logWriteSpeedEvery: FiniteDuration |
the time interval in which it is logged how many documents were written, defaults to a minute |
waitForElasticTimeout: FiniteDuratio |
time to wait to count the documents before switching alias, default to 5 seconds |
sniffCluster |
activate or deactivate [sniffing feature] (https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/_usage.html) for the client |
The MappingSetting
can be passed to specify mappings and settings for the index.
You can either pass a
case class TypedMappingSetting(
analyzer: List[AnalyzerDefinition] = List.empty,
mappings: List[MappingDefinition] = List.empty,
shards: Option[Int] = None,
replicas: Option[Int] = None
)
with analyzers and mappings defined in elastic4s syntax or pass a
StringMappingSetting.unsafeString(source: String): Either[ParsingFailure, MappingSetting]
which gives you a parsingFailure
on creation if the string can't be parsed to json.
The .from([source])
method requires you to give an Indexable
for the elements to be indexed. Alternatively
you can use .fromBuilder
, if you want to give an implicit RequestBuilder
. This allows for more configuration
for the indexing, e.g. you can specify which id
or which timestamp
to use.
You can add the .switchAliasFrom
step to the process of writing, if you want to switch an alias from
an old index to the newly written one. If no old index with this alias was found, the new index gets
the alias without switching.
There are three parameters you can pass to
.switchAliasFrom(alias = "alias", minT = 0.95, maxT = 1.25)
The alias
to switch from and add to the new index, a minT
threshold defining how many percent of the old
index the new index must have as document count and the maxT
defining how many more documents the
new index is allowed to have to allow switching.
Another optional step is .deleteOldIndices
which allows you to clean up old indices.
It has two paramters
.deleteOldIndices(keep = 2, aliasProtection = true)
where keep
defines how many indices with the defined indexPrefix
are left in the cluster and
with aliasProtection
, which, if set to true
, does not allow deletion of any index with and alias set.
For writing the stream to elasticsearch a system
and materializer
are needed. You can either pass them in to the
ElasticIndexer4s
or you can let ElasticIndexer4s create its own system.
Additionally you can pass in a Decider
before starting to write, to control what happens on failure in the stream.
ElasticIndexer4s(config)
.withDecider(decider)
.from(dummySource)
- 0.5.0: switch to using elasticsearch 6.x.x version, using http client instead of tcp client
- 0.5.1: greatly increase the detail level of failure reporting
- 0.6.0: breaking change moving to elastic4s
6.3.3
, dropping all integration tests, as they are not supported at this time anymore by elastic4s (only by spinning up a cluster yourself or embedded docker) - 0.6.5: updates lots of versions and fixes cluster sniffing for the rest client, also removed the cluster from the settings, as it is not supported in the rest client