G33k, Programmes

Bind all your gprc endpoints with a simple and magic coRouter

grpc-services are quite simple, focused and performance oriented services.
Instead of calling them with a format that needs complex marshalling / unmarshalling, and making use of a standard http 1.1 connection, grpc has a dedicated (therefore faster) generated code for each type.
This protocol also indexes data fields by number, which permits to shrink the input by many folds
(if you know the order of the keys in the structure, then you don’t need to indicate them in your payload)

grpc work efficiently with naive Netty servers (all you have to do is to call .bindService()).
But the integration is not so smooth when you currently have reactive spring components to integrate with (authentication, libraries, aspects).
When I have discovered coRouters, I was able to understand that one of its purpose was to dynamically declare endpoints based upon spring beans.

Then if you mix these two concepts together (grpc and coRouters), you can build autowired grpc services on spring.
How do you do that ?
Simply by copy / pasting this snippet in your codebase : https://gist.github.com/libetl/a655de480ed4d123e0c10fe557ea4271

G33k, Programmes

Kotlin Coroutines utilities

If you prefer to combine Kotlin Coroutines with popular solutions like Flowable, Streams or RxJava, where every strategy is readily available, this post is not for you.

Otherwise, if you are interested in writing your own asynchronous strategies, here are two of them (in addition to my post from April 2019 : https://libetl.wordpress.com/2019/04/30/learn-how-to-group-expensive-calls-with-the-coroutines/).

Batching strategy
That strategy helps you stream your “Extract, transform, load” program by starting a parallel execution of a correctly throttled massive process.
In other words, regardless of how many rows are in your dataset, you are able to process them all.

Not too slow, and not too fast to avoid sending a throughput that your downstream actor cannot bear.
The batching strategy is then a kind of streaming facility.

It basically consists in keeping either n workers busy with one row unless if all the rows have been processed.
The strategy is initialized on the first use, and the following datasets can be processed in less “heating” time
Source code of that strategy : https://gist.github.com/libetl/71b826a0db248e6770a2c0b5c0ae6d18#file-batchcoroutinesstrategy-kt

Caching Strategy
Want to keep long time computation results in your program memory after having them processed ? That sounds interesting when your client is requesting some data and you cannot respond in a reasonable amount of time (more than 5 seconds).
Give your client an UUID and tell it to come back later with that UUID.

When a client request an UUID that is not yet computed, you can just reply “oh it is not ready yet”.
If it is done, “here are the results”,
otherwise “sorry, apparently that UUID does not correspond to a task done on this machine”

That strategy consists in a cache object (map of uuid to results), a worker to run async tasks, a “cacheAdder”, a method to poll the status of a task.
Basically, the job starts by sending a message to the worker, which after completion sends the result to the cacheAdder. The cache is configured to automatically make the elements expire 10 minutes after the last read.
Source code of that strategy : https://gist.github.com/libetl/71b826a0db248e6770a2c0b5c0ae6d18#file-cachingcoroutinesstrategy-kt

Can I combine them ?
Absolutely, here are the declarations to have a batching strategy with cache :

private val batch =
    batchingStrategy.batchOf(
        workers = 20,
        coroutineContext = coroutineContext
    ) {
        letsProcess(it)
        // this is where you tell what to do
        // for each element in your dataset
    }

private val batchWithCache =
    cachingStrategy.cache(
        workers = 20,
        coroutineContext = coroutineContext
    ) {
        batch(it).await()
        // "it" represents your data
        // the result of "await" is the
        // global result of the operation.
        // you can add further operations there
    }

G33k, Programmes

Coroutines in Spring API : give me !

Coroutines are a sharply increasing practice among Android developers, but it is seems quite limited to the Android ecosystem, even though the original intent of them is to create an agnostic framework to implement asynchronous and cheap concurrency programs.

There already is a framework that can help combine together coroutines and spring mvc : https://github.com/konrad-kaminski/spring-kotlin-coroutine

This framework is working nicely for hello world or CRUD Apis, but hardly adapts when you have spring security configured, with a LocaleContextHolder, a MDC map, or maybe also a RequestContextHolder attributes object.

This very rough implementation below will help you declare a CoroutineContext bean that you can bind directly in your RestController if you want to use coroutines seamlessly.
It will make a coroutineContext parameter available, and will also make the controller wait until the result has been resolved.

package com.company.myapi

import kotlinx.coroutines.ExecutorCoroutineDispatcher
import org.slf4j.MDC
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.context.i18n.LocaleContextHolder
import org.springframework.core.MethodParameter
import org.springframework.security.core.context.SecurityContextHolder
import org.springframework.security.core.parameters.DefaultSecurityParameterNameDiscoverer
import org.springframework.web.bind.support.WebDataBinderFactory
import org.springframework.web.context.request.NativeWebRequest
import org.springframework.web.context.request.RequestContextHolder
import org.springframework.web.context.request.async.DeferredResult
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler
import org.springframework.web.method.support.HandlerMethodArgumentResolver
import org.springframework.web.method.support.HandlerMethodReturnValueHandler
import org.springframework.web.method.support.ModelAndViewContainer
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler
import java.lang.reflect.Method
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.reflect.jvm.kotlinFunction

object CoroutinesInRestController {

@Configuration
internal class CoroutinesInjection {
@Bean
@Primary
fun parameterNameDiscovererWithCoroutines() =
object: DefaultSecurityParameterNameDiscoverer(listOf()) {
override fun getParameterNames(method: Method) =
((super.getParameterNames(method)?.toList() ?: listOf()) +
if (method.isSuspend) listOf("__continuation__") else listOf()).toTypedArray()
}
@Bean
fun coroutineContext() = object : ExecutorCoroutineDispatcher() {
override val executor = Executors.newFixedThreadPool(128)

override fun dispatch(context: CoroutineContext, block: Runnable) {
val securityContext = SecurityContextHolder.getContext()
val requestAttributes = RequestContextHolder.currentRequestAttributes()
val locale = LocaleContextHolder.getLocale()
val contextMap = MDC.getCopyOfContextMap()
executor.execute {
SecurityContextHolder.setContext(securityContext)
RequestContextHolder.setRequestAttributes(requestAttributes)
LocaleContextHolder.setLocale(locale)
MDC.setContextMap(contextMap)
block.run()
}
}

override fun close() {
executor.shutdown()
}
}
}

@Configuration
internal class CoroutinesWebMvcConfigurer : WebMvcConfigurer {

@Autowired
private lateinit var coroutineContext: CoroutineContext

override fun addArgumentResolvers(resolvers: MutableList<HandlerMethodArgumentResolver>) {
resolvers.add(0, coroutineArgumentResolver(coroutineContext))
}

override fun addReturnValueHandlers(handlers: MutableList<HandlerMethodReturnValueHandler>) {
handlers.add(0, returnValueHandler())
}
}

private const val DEFERRED_RESULT = "deferred_result"

private fun <T> isContinuationClass(clazz: Class<T>) = Continuation::class.java.isAssignableFrom(clazz)
val Method?.isSuspend: Boolean get() = this?.kotlinFunction?.isSuspend ?: false

fun coroutineArgumentResolver(coroutineContext: CoroutineContext) =
object : HandlerMethodArgumentResolver {
override fun supportsParameter(parameter: MethodParameter) =
parameter.method.isSuspend && isContinuationClass(parameter.parameterType)

override fun resolveArgument(parameter: MethodParameter, mavContainer: ModelAndViewContainer,
webRequest: NativeWebRequest, binderFactory: WebDataBinderFactory) =
object : Continuation<Any> {
val deferredResult = DeferredResult<Any>()

override val context: CoroutineContext
get() = coroutineContext

override fun resumeWith(result: Result<Any>) {
if (result.isSuccess) {
deferredResult.setResult(result.getOrNull())
} else {
deferredResult.setErrorResult(result.exceptionOrNull())
}
}
}.apply {
mavContainer.model[DEFERRED_RESULT] = deferredResult
}
}

fun returnValueHandler() =
object: AsyncHandlerMethodReturnValueHandler {
private val delegate = DeferredResultMethodReturnValueHandler()

override fun supportsReturnType(returnType: MethodParameter): Boolean =
returnType.method.isSuspend

override fun handleReturnValue(returnValue: Any?, type: MethodParameter,
mavContainer: ModelAndViewContainer, webRequest: NativeWebRequest) {
val result = mavContainer.model[DEFERRED_RESULT] as DeferredResult<*>

return delegate.handleReturnValue(result, type, mavContainer, webRequest)
}

override fun isAsyncReturnValue(returnValue: Any, returnType: MethodParameter): Boolean =
returnValue === COROUTINE_SUSPENDED
}
}

This special implementation will take care of keeping the ThreadLocal values up to date in each coroutine scope, so you can continue processing your request without standing out of the servlet context values.

That was useful for me, and maybe it will be for you too, so good luck.

G33k, Programmes

Learn how to group expensive calls with the coroutines

In a micro-service fashion, everything is streamlined to small domains, small actions and therefore fast responses.

This is theorically true, BUT. Sometimes we may want to access to expensive in time information. Either a partner api which is slow, or a restricted access to a confidential network which takes seconds to route correctly.

You will run the risk of having the client api renewing the same request multiple times because the data has no fallback information (you cannot find a fallback for a credit card for example)

In this case, the Nagle’s algorithm’m will not work. The retries will happen each N seconds, and then the packets will get acknowledged before the grouping can happen.

It is time if not too late to group your api calls together. All the inbound retries will subscribe to the same backend call. And one successful technology to do that is the Coroutines in Kotlin.

We are going to create two roles in this process : a router to map each request to one task, and several workers whose role is to handle a task. The workers, which are not threads but coroutines, will always be changing contexts to avoid wasting cpu time.

The behavior is : The client api will send its input to the router, which will reply either with a new task or a currently running task. It will wait until completion of the task. The only intelligence there is to compare the input with the other saved inputs to see if two of them are the same. It will help the router to decide to merge two requests in one.

The repository in github is this one : https://github.com/libetl/coroutines-pooling-client

The test runs two scenarios : one having 100 requests of different calls with different inputs, and one having 100 requests exactly identical. Each backend call will be logged.

In the first scenario, each inbound call leads to a call to the backend (100 different inputs)

In the second scenario, there are 100 inbound calls but only 10 calls to the backend (you can even decrease that number if the latency of the backend increases). Why ? As you have guessed, some calls are grouped together.

Coroutines can help you implement a wide range of different strategies for your use cases, but what I foresee the most is the need for seamless implementations when you need your code to stay stupid and simple. And the coroutines framework easily help to do that by letting the business code dislocated from the boilerplate use of coroutines.

So now perf developers and craftsmen can become friends again.