Scalable Fraud Detection for Zalando's Fashion Platform
Team Payana have been busy: Read about their migration efforts for the Zalando platform.
Longread: 15 minutes/3,282 words
Zalando’s vision of growing from an online fashion retailer to a fashion platform not only opens up internal Zalando services to external partners, but also dramatically increases the amount of data that flows through the company's backend.
At Zalando, it’s up to individual teams to ensure that the services they own are ready for this challenge. This poses great responsibility, but also allows each team to evaluate which technology stack they should use to tackle the challenge.
As part of the Data Engineering department, our team provides a service that estimates the fraud risk for incoming orders. In a nutshell, we estimate the risk of a customer's order by using machine learning models that were learned with historical order data.
Moving to the platform world also challenged our team in different ways. On the one hand, we need to be able to make predictions on an unforeseen amount of orders in real-time. On the other hand, we need to be able to update existing machine learning models in short intervals.
Coming from a data science background, our first solution to these challenges was implemented with a Python system that uses scikit-learn for the machine learning part. However, we soon discovered that this solution does not scale as required with the new platform vision. Hence, we decided to migrate the existing system to a new solution which is based on Spark and Scala.
This post is about the journey of this migration. We will briefly sketch out our old solution, outline the pain points, and show how they were relieved by Spark and Scala. We will also share the lessons we have learned on our journey and discuss some ongoing problems.
The two use cases and the old framework
Our old implementation solves a twofold challenge: On the one hand there are a number of models that need to be trained on historical data; on the other hand these models are used in production to deliver real-time predictions. These two use cases are closely related but each comes with its specifics; see the figure below for an illustration.
In the live system (the “prediction data flow”), order data comes in the form of a JSON request. The request’s data fields are read into a data access object, which is used to compute all features and yield a data point. This initial data point may have missing values since the data may be corrupt or incomplete. Thus, an imputer is used to fill in meaningful default values for them. The complete data point is now used as input to the final model, which delivers a fraud probability, `P_f`. This value is then used by the consumers of the service to decide how to proceed with the order. Along this path, pre-trained models are required for each step: Some of the features, a few pre-processors, the imputer, the final prediction model.
Refer to the figure below for an example:
The learning procedure for these models is similar (the “learning data flow”); it involves acquisition of the data, filtering the relevant records (e.g. particular periods, countries), putting them in the shape of requests, extracting the valid data points, and finally, the training.
Our previous solution addressed these challenges in terms of Python’s CherryPy for serving requests in the prediction data flow, and with scikit-learn for the machine-learning part, which is running on the company’s in-house cluster for crunching the numbers. We use JSON configuration files to specify the features of a model, its submodels and preprocessors, the data filtering and the pre-processing steps. We’ve built an infrastructure to read these specifications, fetch the relevant data from the repositories, and schedule the learning jobs on the cluster, parallelising where possible (e.g. independent submodels). For deployment, the trained models are saved in a binary format and copied to a write-once-read-many (WORM) drive to ensure their integrity. To do the real-time predictions, a correspondingly configured RESTful service is deployed in a data center. Over time, this evolved to hundreds and hundreds of model files and nearly one hundred deploy units.
Pain points
While our Python-based solution has been doing a decent job for the last few years, the situation was gradually changing with the increasing amount of data and overall complexity that was growing into the system. Having the new platform vision in the back of our minds, several pain points became obvious and cast doubt on the system’s future-proofness:
- While Python is certainly a very good choice to rapidly prototype products, it also has some major drawbacks. Python’s interpreted execution and dynamic type system goes along with complicated maintenance of the production systems; we often found ourselves facing production issues that could have been prevented in the first place. Additionally, the Python interpreter does not support pure multithreading. This regularly prevented our fraud prediction system from scaling up to higher loads.
- For the learning of our models, we pushed the size of the input data to the memory limits of the in-house cluster. Since the platform vision requires us to scale to almost any size of data, we need a more flexible solution here.
- The complexity of our system has been growing with every new model and feature added to the system. At some point, the configuration of the models became unmanageable. At the same time, our codebase was already too complex to start a reliable refactoring effort to allow us to fix the configuration hell. One reason for this is that the plain JSON config files don’t allow you to easily reuse or nest common pieces.
- We ran at the limits of the in-house infrastructure. Our jobs compete with other critical ones for computational resources on the static cluster to learn models, and for access to a busy analytics database to acquire training data. Both resources turned to bottlenecks, slowing down our availability to quickly learn new models.
Introducing the new system
Faced with the aforementioned challenges, we sat down and discussed alternatives to provide a platform-ready fraud-assessment system. This was around the same time when the word about the Big Data processing framework Apache Spark reached us and the whole Data Science community. In a nutshell, Spark promises to distribute your data processing task seamlessly to a set of worker machines, which then work on their own fraction of data in parallel. This implies that you can seamlessly scale up your processing power and memory. Moreover, Spark comes with APIs for the most important machine learning algorithms, while it can connect to almost any major data source. It is also written in Scala, a JVM-based language, which provides the type-safety we were missing before. It was quite obvious: Spark promised we could get rid of most of our pain points.
The learn framework
When beginning the implementation of our new system with Spark in a more scalable fashion, we not only wanted to reproduce our old system, but also improve on some aspects. Hence, the redesign of the learn framework centers around three basic principles: Learning and data acquisition should be fundamentally separated; a model is a tree of models; and configuration is code.
Data and learning
Querying the data sources for each learning session slows down the overall process of producing the models. And the more often we need to do this, as the fashion platform context imposes on the system, the more we get pushed back by it. Thus, the first benefit of separating data collection from the process of learning is that it removes the bottleneck associated with the major data source we use – a busy analytics database.
Technically, the data is loaded from the sources (databases, logs, etc.), put in a common format, and stored on Amazon’s S3 where it is available for learning on AWS. The other effect of this decoupling is that including a new source is matter of enhancing the data collection, which stays oblique to the learning part and allows us to include or substitute arbitrary sources without touching the learning part unnecessarily.
Trees of models
A central design decision is how features and models relate and interact. As shown in the figure below, we view a model as being a common feature, and at the same time, a model can have many features. A model is comprised of a predictor (e.g. linear regression), a bunch of preprocessors, and the aforementioned collection of features. This gives rise to a hierarchical structure where every model itself can be considered input of a higher order model. In this way, along with trivial features, which are a simple function of the fields of a record, we can just as easily specify features, which are an aggregation of many records, and thus need to be “trained”.
This provides for a very powerful, conceptually simple tool to compose models. Both prediction and learning are aware of this structure and can take care of doing the right thing in the right order with respect to the implicit dependency graph. For instance, the model in the figure above would only be asked to predict the probability once its features are complete, i.e. the submodels have delivered the prediction; for learning, the model would only be trained once the submodels had been trained and could be used to provide the needed features.
Configuration is a class constructor
Configuration is as important for the result of any computer program as code. In fact, configuration can have bugs, needs to be documented, reviewed, tested. It should be given as much care as code. What if configuration *was* code?
In the designing phase of the new framework, looking at the configuration files for our models we figured out that, in terms of OOP, they resemble the parameters of nested constructors. As an obvious thing, we would parse the JSONs and initiate the construction of the models. However, one can just as well bypass this error-prone step and specify the parameters of the constructors in the code itself!
Having the configuration compiled and type-checked at compile time with Scala’s sophisticated type system brings it to another level of protection against trivial and hard to find syntactic and semantic bugs. It allows the model design to fail early when non-critical and forbids big abuse of mechanisms – all without writing a single test, just by means of proper typization.
Furthermore, models and features defined as classes can be reused across different configuration “files” in a consistent manner, which brings about conciseness, too.
In the same line of thought, having implemented a minimal domain specific language, we can specify which jobs to be thrown at the cluster, as in Learn(model, …) andThen Predict(model, …):
package de.zalando.payana.lf.model.appDe
case class DynamicFeat1() extends BasicScorer(
"SecretScore1", SecretScorer1(), Seq("1970-01"))
case class DynamicFeat2() extends BasicScorer(
"SecretScore2", SecretScorer2(), Seq("1970-02", "1970-03"))
case class DynamicFeat3() extends BasicScorer(
"SecretScore3", SecretScorer3(), Seq("1970-04"))
case class DynamicFeat4() extends BasicScorer(
"SecretScore4", SecretScorer4(), Seq("1970-05", "1970-06"))
case class Forest_1970_09() extends Model[OrderDao](
randomForest("model_1970_09"),
BasicFeatures ++ CustomerHistoryFeatures ++
Seq(DynamicFeat1(), DynamicFeat2(), DynamicFeat3(), DynamicFeat4()),
Seq("1970-09", "1970-10", "1970-11")
)
class Job_2016_02_02 extends Job {
override def execute(implicit ctx: Context): Unit = {
...
val allModels = Seq(Forest_1970_09(), Ridge_1970_09())
for (model <- allModels) {
Learn(model, ...) andThen Predict(model, ...)
}
}
}
In general, having job specification and configuration in the same codebase, tied to the strictly typed framework, makes experiments better documented, reproducible, and secure.
Solution for prediction
One important aspect for our setup is that we do not want to have any technology break between preprocessing, learning, and prediction in order to reuse definitions and avoid additional efforts for keeping codebases in sync. Previously, this meant that we used a CherryPy server to provide a RESTful interface for prediction. With our models learned in Scala and Spark, we’ve implemented a Scala-based web server solution in the new system. Although we did not have much prior expertise in JVM-based web frameworks, we found it rather easy to setup a web server in Scala by using the Play framework. Only a few lines of code were needed to set up a real multithreading solution that outperforms the old solution significantly, as we will see in the following.
Comparison
To see if the effort of migration from our previous solution to Scala and Spark really pays off, we compared both systems throughout a series of different experiments.
Learning time
For this comparison we learn a classification model with each system. The Python model is learned on our in-house cluster, processing all data for each submodel on a single machine with 10 cores, scheduling in parallel (on separate machines) the independent submodels. In contrast, the Spark model is learned in a data-distributed setting with one driver and five worker nodes. When comparing the overall learning times, we observe a clear reduction with the Spark setup, i.e. the overall learning time drops by a factor of two.
Prediction time
As already mentioned, one crucial aspect of our system is the ability to perform timely and scalable fraud predictions on new orders. For this purpose, we compare the performance of our old CherryPy-based prediction engine with our new Play-based prediction framework. We set up a load test that sends 5,000 prediction requests to the respective prediction engine using different concurrency levels for the requests. The following figure shows the response time for both prediction engines.
We can see that the Python solution has a much higher response time when the number of concurrent requests is higher than one, while the response time of the Scala engine stays almost constant (even when facing higher concurrency levels). For instance, with 20 concurrent requests it needs on average only ~70 ms compared to ~1000 ms for the Python module.
Accuracy
Great time figures for prediction and learning wouldn’t be worth anything if our new models don’t predict, as well as the old ones did in the first place. There are standard implementations in both Spark and scikit-learn for many linear and nonlinear models that we consider. Feeding them with the same data results in comparable results for the linear models. Somewhat surprisingly, some Spark models lose to scikit-learn by an amount we don’t tolerate. However, we were able to find comparable substitutes for each model we use. An insightful benchmark for classification algorithm implementations is provided by this brilliant post. (The interested reader should follow the link from there to the discussion with one of Spark’s designers). Eventually, different implementation paradigms unavoidably bring about differences in performance.
Optimising with sparse features
When modelling yields features, which can potentially take thousands of different values, sometimes only a tiny fraction of them are present in the data. This is the ideal case to use a sparse representation: Instead of having a 50k-dimensional vector, one would rather encode the non-zero positions in the vector (or use a similar lossless compression technique). While Spark has a sparse vector implementation, we encountered some odd behaviour when using it. It turned out that under the hood the sparse optimizer blows sparse vectors up to dense vectors.
val states = optimizer.iterations(
new CachedDiffFunction(costFun),
initialCoefficientsWithIntercept.toBreeze.toDenseVector
)
In the presence of high-dimensional sparse features, the resulting Hessian matrix ends up being very sparse. Since we encountered different oddities when using sparse features, we suspect that the optimization is numerically unstable because of that. Without further trying to fix the internals of the optimization, we were able to use our flexibly designed framework to solve the problem: We implemented a sparse-to-dense condenser in terms of a preprocessor to our models.
The condenser looks at the whole dataset and for sparse features, it keeps track of the values that are set. It can then throw away all non-set dimensions and shrink the final vectors by a great deal. This resulted not only in better predictions (more than 25% improvement) but also more than halved the runtime.
One of the great aspects of Scala and Spark is that they allow you to write rather complex logic in a very concise way. Hence, we listed the complete implementation below:
case class Condenser() extends LearnablePreprocessor[Map[Int, Int]] {
override def modelGenerator(data: RDD[MllibLabeledPoint])(implicit ctx: Context): Map[Int, Int] = {
val seqOp = (set: Set[Int], lp: MllibLabeledPoint) => lp.features match {
case s: SparseVector => set ++ s.indices.toSet
case d: DenseVector => throw new RuntimeException("Condenser does not work with DenseVectors")
}
val combOp = (set1: Set[Int], set2: Set[Int]) => set1 ++ set2
val nonZeroIndicesGlobal = data.treeAggregate(Set[Int]())(seqOp, combOp)
nonZeroIndicesGlobal.toSeq.sorted.zipWithIndex.toMap
}
override def preprocess(v: Vector): Vector = v match {
case s: SparseVector => {
val condensedIndexMap = getModel
val overlap = s.indices.filter(i => condensedIndexMap.contains(i))
val newKeyMap = overlap.map(i => (condensedIndexMap(i), s(i)))
Vectors.sparse(condensedIndexMap.size, newKeyMap)
}
case d: DenseVector => throw new RuntimeException("Condenser does not work with DenseVectors")
}
}
Lessons learned
With the migration to the new solution, we get rid of most pain points and also gain a lot in terms of performance, as shown before. However, we want to also share some lessons that we learned during our migration to the new framework.
First of all, the transition was not so easy in terms of technology. Coming from a data science background, most of our team is proficient in R or Python, but not in JVM-based languages. Hence, the learning curve for Scala was rather steep in the beginning. The use of Spark also introduced some additional hurdles we had to tackle. Since Spark ships the user code to a set of worker nodes, we had a hard time debugging and spent long sessions on making code serialisable to allow Spark to ship it to workers.
Another part that bugged us was the maturity level of the MLlib library, which we relied on for learning our models. Apparently, the produced models are not always as accurate as the ones you get from a more mature library such as scikit-learn. You really have to take this into account when learning your models and, ideally, compare the ROC curves with a more mature library. We are positive that eventually this will vanish over time in the process of Spark being adopted more widely and more people contributing to the project.
Conclusion
We are designing solutions for fraud detection and prevention, and face the need to make them scale adequately with the future Zalando fashion platform capacity – both for real-time prediction and for training models.
We’ve gone an interesting way of redesigning our previous solution in a more scalable and resilient format, and in this post we’ve shared the lessons that we learned on the way. We started at why our Python-based solution, which uses CherryPy for serving requests, scikit-learn for machine learning models, and number-crunching on an in-house static cluster, does not scale to the new challenges. We arrived at a new design in Scala, using the Play framework to serve requests, Spark for machine learning algorithms, and for running on AWS. This evaluation shows that it scales considerably better in terms of time and versatility. In addition, the strict types, compilation of code, and configuration makes it more secure and more fun to hack.
Credits
Team Payana are Patrick Baier, Stanimir Dragiev, Henning-Ulrich Esser, Andrei Kaigorodov, Tammo Krüger, Philipp Marx, and Oleksandr Volynets. The transition is a team effort with contributions from everyone. The driving force for the redesign is to be attributed in the largest extent to Tammo. You can also check out our presentaton at Spark Summit Europe about our journey from Scikit learn to Spark.
We're hiring! Do you like working in an ever evolving organization such as Zalando? Consider joining our teams as a Data Engineer!