Database I/O Actions
This chapter explains how to execute, compose, and control DBIOAction values.
In this chapter:
- Execute actions with
run(materialized) orstream(streaming). - Compose actions with sequencing and error-handling combinators.
- Control transaction/session behavior with
transactionallyandwithPinnedSession. - Drop down to JDBC when needed with
SimpleDBIO.
Anything that you can execute on a database, whether it is a getting the result of a query (myQuery.result), creating a table (myTable.schema.create), inserting data (myTable += item) or something else, is an instance of DBIOAction, parameterized by the result type it will produce when you execute it.
Database I/O Actions can be combined with several different combinators (see the DBIOAction class and DBIOAction object, which is also available under the alias DBIO, for details), but they will always be executed strictly sequentially and (at least conceptually) in a single database session.
In most cases you will want to use the type aliases DBIO and StreamingDBIO for non-streaming and streaming Database I/O Actions. They omit the optional effect types supported by DBIOAction.
In the code examples below we assume the following imports:
sourceimport java.sql.Blob
import javax.sql.rowset.serial.SerialBlob
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success}
import cats.effect.IO
import cats.effect.Resource
import cats.effect.unsafe.implicits.global
import slick.cats.Database
import slick.jdbc.DatabaseConfig
import slick.jdbc.H2Profile
import slick.jdbc.H2Profile.api._
import slick.ControlsConfig
If you’re new to Slick, please start with the Getting Started page.
Executing Database Actions
Start here if you want to run a query or action and get results back.
DBIOActions can be executed either with the goal of producing a fully materialized result or streaming data back from the database.
Materialized
You can use run to execute a DBIOAction on a Database and produce a materialized result. This can be, for example, a scalar query result (myTable.length.result), a collection-valued query result (myTable.to[Set].result), or any other action. Every DBIOAction supports this mode of execution.
Execution of the action starts in the background when run is called. The calling thread is not blocked. The materialized result is returned as an F[R] value (for example IO[Seq[User]]) that completes asynchronously and can be composed with the rest of your effect-based program:
sourceval q = for (c <- coffees) yield c.name
val a = q.result
val f: IO[Seq[String]] = db.run(a)
f.map {
case s => println(s"Result: $s")
}
Streaming
Collection-valued queries also support streaming results. In this case, the actual collection type is ignored and elements are streamed directly from the result set through the streaming type of the selected Slick facade.
Execution of the DBIOAction does not start until the stream is consumed.
Repeated-consumption semantics depend on the selected facade stream type. See the facade-specific Scaladoc for slick.cats.Database and slick.zio.Database.
Stream elements are signaled as soon as they become available in the streaming part of the DBIOAction. The end of the stream is signaled only after the entire action has completed. For example, when streaming inside a transaction and all elements have been delivered successfully, the stream can still fail afterwards if the transaction cannot be committed.
sourceval q = for (c <- coffees) yield c.name
val a = q.result
val s: fs2.Stream[IO, String] = db.stream(a)
// Use FS2 combinators to process the stream:
s.evalMap(name => IO(println(s"Element: $name")))
.compile
.drain
When streaming a JDBC result set, each iterator.next() call is wrapped in F.blocking, so the OS thread is only occupied during the actual row fetch. LOB values (such as Blob) are safe to access synchronously on the element because the result set pointer is not advanced until the consumer requests the next element:
sourceval q = for (c <- coffees) yield c.image
val a = q.result
val s: fs2.Stream[IO, Blob] = db.stream(a)
val bytesStream: fs2.Stream[IO, Array[Byte]] =
s.map(b => b.getBytes(0, b.length().toInt))Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.
Composing Database I/O Actions
This section covers how to build larger workflows from smaller actions.
DBIOActions describe sequences of individual actions to execute in strictly sequential order on one database session (at least conceptually), therefore the most commonly used combinators deal with sequencing. Since a DBIOAction eventually results in a Success or Failure, its combinators have to distinguish between successful and failed executions. Unless specifically noted, all combinators only apply to successful actions. Any failure aborts the sequence of execution.
Sequential Execution
The simplest combinator is DBIO.seq which takes a varargs list of actions to run in sequence, discarding their return value. If you need the return value, you can use andThen to combine two actions and keep the result of the second one. If you need both return values of two actions, there is the zip combinator. For getting all result values from a sequence of actions (of compatible types), use DBIO.sequence. All these combinators work with pre-existing DBIOActions which are composed eagerly:
sourceval ins1: DBIO[Int] = coffees += ("Colombian", 7.99)
val ins2: DBIO[Int] = coffees += ("French_Roast", 8.99)
val a1: DBIO[Unit] = DBIO.seq(ins1, ins2)
val a2: DBIO[Int] = ins1 andThen ins2
val a3: DBIO[(Int, Int)] = ins1 zip ins2
val a4: DBIO[Vector[Int]] = DBIO.sequence(Vector(ins1, ins2))
If an action depends on a previous action in the sequence, you have to compute it on the fly with flatMap or map. These two methods plus filter enable the use of for comprehensions for action sequencing.
Similar to DBIO.sequence for upfront composition, there is DBIO.fold for working with sequences of actions and composing them based on the previous result.
Error Handling
You can use andFinally to perform a cleanup action, no matter whether the previous action succeeded or failed. This is similar to using try ... finally ... in imperative Scala code. A more flexible version of andFinally is cleanUp. It lets you transform the failure and decide how to fail the resulting action if both the original one and the cleanup failed.
For even more flexible error handling use asTry and failed. Unlike with andFinally and cleanUp the resulting actions cannot be used for streaming.
Cancellation behaviour: cleanUp and andFinally run their cleanup actions on fiber cancellation โ cleanUp receives Some(CancellationException) so you can distinguish cancellation from errors. After cleanup completes, the fiber remains canceled.
asTry and failed do not intercept cancellation. If the underlying action is canceled, the fiber stays canceled and downstream flatMap continuations do not run.
Primitives
You can lift any CE3 effect F[R] into an action with DBIO.from. DBIO.liftF is an alias for DBIO.from. This allows an IO (or any other F[_]: Async value) to be used in an action sequence:
val action: DBIO[String] = for {
id <- DBIO.from(IO(java.util.UUID.randomUUID().toString))
_ <- users += User(id, "Alice")
} yield id
A pre-existing value or failure can be converted with DBIO.successful and DBIO.failed, respectively.
Debugging
The named combinator names an action. This name can be seen in debug logs if you enable the slick.basic.BasicBackend.action logger.
Transactions and Pinned Sessions {#transactions}
When executing a DBIOAction which is composed of several smaller actions, Slick acquires sessions from the connection pool and releases them again as needed so that a session is not kept in use unnecessarily while waiting for the result of a non-database computation (e.g. the function passed to flatMap that determines the next action to run). You can use withPinnedSession to force the use of a single session, keeping the existing session open even when waiting for non-database computations.
All DBIOAction combinators which combine database actions without any non-database computations in between (e.g. andThen or zip applied to two database computations) can fuse these actions for more efficient execution, with the side-effect that the fused action runs inside a single session, even without withPinnedSession.
There is a related combinator called transactionally to force the use of a transaction. This guarantees that the entire DBIOAction that is executed will either succeed or fail atomically. Without it, all database actions run in auto-commit mode. The use of a transaction always implies a pinned session.
An overload transactionally(ti) accepts a TransactionIsolation level:
action.transactionally(TransactionIsolation.Serializable)
sourceval a = (for {
ns <- coffees.filter(_.name.startsWith("ESPRESSO")).map(_.name).result
_ <- DBIO.seq(ns.map(n => coffees.filter(_.name === n).delete): _*)
} yield ()).transactionally
val f: IO[Unit] = db.run(a)Warning: Failure is not guaranteed to be atomic at the level of an individual DBIOAction that is wrapped with transactionally, so you need to be careful where you apply error recovery combinators. An actual database transaction is only created and committed or rolled back for the outermost transactionally action. Nested transactionally actions simply execute inside the existing transaction without additional savepoints.
Cancellation guarantee: in Slick 4 a transaction is rolled back not only on error but also on fiber cancellation. This guarantee was not possible with Future-based execution.
Rollbacks
In case you want to force a rollback, you can return DBIO.failed within a DBIOAction.
sourceval countAction = coffees.length.result
val rollbackAction = (coffees ++= Seq(
("Cold_Drip", new SerialBlob(Array[Byte](101))),
("Dutch_Coffee", new SerialBlob(Array[Byte](49)))
)).flatMap { _ =>
DBIO.failed(new Exception("Roll it back"))
}.transactionally
val errorHandleAction = rollbackAction.asTry.flatMap {
case Failure(e: Throwable) => DBIO.successful(e.getMessage)
case Success(_) => DBIO.successful("never reached")
}
// Here we show that that coffee count is the same before and after the attempted insert.
// We also show that the result of the action is filled in with the exception's message.
val f = db.run(countAction zip errorHandleAction zip countAction).map {
case ((initialCount, result), finalCount) =>
// init: 5, final: 5, result: Roll it back
println(s"init: ${initialCount}, final: ${finalCount}, result: ${result}")
result
}
JDBC Interoperability
Use this when you need JDBC functionality that is not directly modeled in Slick.
In order to drop down to the JDBC level for functionality that is not available in Slick, you can use a SimpleDBIO action which is run on a database thread and gets access to the JDBC Connection:
sourceval getAutoCommit = SimpleDBIO[Boolean](_.connection.getAutoCommit)
If you need to access state of the database session across multiple SimpleDBIO actions, make sure to use withPinnedSession or transactionally accordingly (see above).