I’ve been working with Spark for a few years now, and have pushed it into realms where it was not really meant to go. Over this time, I have written a lot of custom aggregators, catalyst expressions, custom RDDs and streaming interfaces.
This is the second part of this series, and is a short one.
Spark provides quite a rich set of interfaces for writing custom functions and aggregates to its runtime.
Unfortunately, most of these interfaces are considered relatively internal, and relatively complicated; so the Spark developers have exposed an interface for user defined functions.
User Defined Functions, or UDF, and their relatives User Defined Aggregate Function have reasonable support in Scala, Python, and R; and for a good few many jobs, work ok.
The biggest issue with these however, is that they automatically marshall to the host language using reflection techniques, and their semantics with regards to polymorphic expressions is either non-existant, or drastically broken. They also are limited in even the monomorphic types they can work with, and types like maps and structures just don’t seem to work.
Luckily though, when working in Scala, one can use the interfaces Spark uses itself to write add required functionality.
There are a good range of these, including TypedImperativeAggregate
, and DeclarativeAggregate
,
and the plain old Expression
.
Here, we’re going to quickly defined a polymorphic identity function, which is a UnaryExpression
,
these are a simple extension of Expression
with just one child in the Spark DAG.
case class Ident(child: Seq[Expression])
extends UnaryExpression with CodegenFallback {
override def prettyName: String =
"ident"
override def dataType: DataType =
.dataType
child
override def eval(input: InternalRow): Any =
child eval input}
The CodeGenFallback
here says that we trust Spark to build efficient enough Scala code when
doing a full stage compilation, and we’re not going to write a custom Scala pretty printer for
this expression.
To use this function in Scala code, it’s quite simple. We just need to provide a function which
operates as a Column
by wrapping and unwrapping the internal expression. I tend to just use
the apply method of a companion object
object Ident {
def apply(child: Column): Column =
new Column(child.expr)
}
We can now just use this function in a Spark select
for instance, or anywhere else a Column
is required.
However, if we want to provide this so we can use it in a parsed SQL text expression, we’re put in a very difficult place. The Spark developers have provided a way to register functions defined as UDFs and native Scala functions to be surfaced, but not functions written the way normal Spark definitions are defined!
Here’s what we’ve been given for unary function like what we’d like to define:
class UDFRegistration extends Logging {
def register[RT: TypeTag, A1: TypeTag](name: String, func: (A1) ⇒ RT):
UserDefinedFunction
def register(name: String, udf: UserDefinedFunction):
UserDefinedFunction}
The first of these allows us to use a standard Scala function as a spark expression, but requires that its type is monomorphic and able to be reflected upon (behind the scenes, Spark will perform an implicit cast so that the types line up).
The second is relatively uninteresting, as a UserDefinedFunction
is created by the function udf
which uses the same reflection techniques as the first register
.
What we can do though, is utterly abuse this second function, and Scala’s class inheritance. The
interesting this about the register
function, is that the only function which register
calls
is the UDF’s apply
method, and it is extremely similar to the one we wrote above. So what we
can do, is extend the UserDefinedFunction
class and override its apply method.
We need some dummy parameters for the case class, but don’t worry, they’re never going to be used.
object Ident extends UserDefinedFunction(identity[Nothing] _, NullType, None) {
// What we had before to make our Scala code nicer
def apply(child: Column): Column =
new Column(child.expr)
// How we overrides the UDF's apply method, allowing
// it to be used in imported SQL literals and expressions.
override def apply(exprs: Column*): Column = {
.toList match {
exprscase child :: Nil =>
Ident(child)
case _ =>
throw new Exception("Ident takes one argument")
}
}
}
And that’s about it, we can now register this custom catalyst expression, even though the Spark developers didn’t really seem to think we would want to.
I’ve been using this trick a fair bit recently, as writing UDFs is extremely limiting when any form of polymorphism is required.