Skip to content

fraud case study example #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fraud/1_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Transaction:

# Computed properties
clean_memo: str
is_nsf: bool
is_nsf: bool # non-sufficient funds


@features
Expand Down
5 changes: 5 additions & 0 deletions fraud_case_study/datasources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from chalk.sql import SnowflakeSource
from chalk.streams import KafkaSource

kafka = KafkaSource(name="txns_data_stream", topic="transactions")
snowflake = SnowflakeSource(name="user_db")
43 changes: 43 additions & 0 deletions fraud_case_study/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from chalk.features import features, FeatureTime
from chalk.streams import Windowed, windowed


@features
class User:
id: int
first_name: str
last_name: str
email: str
address: str
country_of_residence: str

# these features are aggregated over the last 7, 30, and 90 days
avg_txn_amount: Windowed[float] = windowed("7d", "30d", "90d")
num_overdrafts: Windowed[int] = windowed("7d", "30d", "90d")

risk_score: float

# transactions consists all Transaction rows that are joined to User
# by transaction.user_id
transactions: DataFrame["Transaction"]


@features
class Transaction:
# these features are loaded directly from the kafka data source
id: int
user_id: User.id
ts: FeatureTime
vendor: str
description: str
amount: float
country: string
is_overdraft: bool

# we compute this feature using transaction.country and transaction.user.country_of_residence
in_foreign_country: bool = _.country == _.user.country_of_residence





36 changes: 36 additions & 0 deletions fraud_case_study/kafka_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pydantic import BaseModel
from datasources import kafka
from chalk.streams import stream

# Pydantic models define the schema of the messages on the stream.
class TransactionMessage(BaseModel):
id: int
user_id: int
timestamp: datetime
vendor: str
description: str
amount: float
country: str
is_overdraft: bool

@stream(source=kafka)
def stream_resolver(message: TransactionMessage) -> Features[
Transaction.id,
Transaction.user_id,
Transaction.timestamp,
Transaction.vendor,
Transaction.description,
Transaction.amount,
Transaction.country,
Transaction.is_overdraft
]:
return Transaction(
id=message.id,
user_id=message.user_id
ts=message.timestamp,
vendor=message.vendor,
description=message.description,
amount=message.amount,
country=message.country,
is_overdraft=message.is_overdraft
)
44 changes: 44 additions & 0 deletions fraud_case_study/other_resolvers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from chalk import online, DataFrame
from kafka_resolver import TransactionMessage
from risk import riskclient


@online
def get_avg_txn_amount(txns: DataFrame[TransactionMessage]) -> DataFrame[User.id, User.avg_txn_amount]:
# we define a simple aggregation to calculate the average transaction amount
# using SQL syntax (https://docs.chalk.ai/docs/aggregations#using-sql)
# the time filter is pushed down based on the window definition of the feature
return f"""
select
user_id as id,
avg(amount) as avg_txn_amount
from {txns}
group by 1
"""

@online
def get_num_overdrafts(txns: DataFrame[TransactionMessage]) -> DataFrame[User.id, User.num_overdrafts]:
# we define a simple aggregation to calculate the number of overdrafts
# using SQL syntax (https://docs.chalk.ai/docs/aggregations#using-sql)
# the time filter is pushed down based on the window definition of the feature
return f"""
select
user_id as id,
count(*) as num_overdrafts
from {txns}
where is_overdraft = 1
group by 1
"""

@online
def get_risk_score(
first_name: User.first_name,
last_name: User.last_name,
email: User.email,
address: User.address
) -> User.risk_score:
# we call our internal Risk API to fetch a user's latest calculated risk score
# based on their personal information
riskclient = riskclient.RiskClient()
return riskclient.get_risk_score(first_name, last_name, email, address)

5 changes: 5 additions & 0 deletions fraud_case_study/users.chalk.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- resolves: User
-- source: user_db

select id, email, first_name, last_name, address, country_of_residence
from user_db