Summary

We will write a python script to collect raw endpoint data with the Carbon Black EDR API. We will manipulate this raw data to extract useful features. We then process these features to convert categorical data into numeric and reduce dimensionality. Finally, we will then fit isolation forest and one class SVM to the dataset to identify outliers.

Collect endpoint data

We will use the Carbon Black EDR python API to collect raw endpoint data. The CB EDR API accepts search queries, and returns process information with different fields. We will extract some of these fields to use as features. We can pass most of these fields into our next function except for the datetime objects, which we will convert into multiple integers.

def collect_process_features(query):
	process_data = []

	cb = CbResponseAPI()
	query = cb.select(Process).where(query)

	process_feature_names = ["process_name", "parent_name", "hostname", "host_type", "os_type", "username"]
	netconn_feature_names = ["direction", "proto", "local_ip", "local_port", "proxy_ip", "proxy_port", "remote_ip", "remote_port"]

	for process in tqdm(query):
		url = f"https://[EDR ADDRESS]/#/analyze/{process.id}/{process.current_segment}?cb.legacy_5x_mode=false"
		process_features = [getattr(process, feature) for feature in process_feature_names] + get_datetime_features(process.start)

		for event in process.all_events_segment:
			netconn_features = []
			if event.event_type != "Cb Network Connection event":
				continue
			for feature in [getattr(event, feature) for feature in netconn_feature_names] + get_datetime_features(event.timestamp):
				netconn_features.append(feature)

			process_data.append([url] + process_features + netconn_features)
			
	save_data(process_data)
	return process_data

Process features

We now need to do some processing on our dataset, namely converting categorical features into numeric features. Additionally, we can do basic dimensionality reduction by removing any features with no variance.

def process(data):
    data = [d[1:] for d in data]
    encoded_data = OneHotEncoder().fit_transform(data)
    return VarianceThreshold(threshold=0).fit_transform(encoded_data)

Outlier detection

We can fit isolation forest and one class SVM to our dataset. An array will be returned with the class label for each sample in the dataset.

def isolation_forest(X, n, c):
    return IsolationForest(n_estimators=n, contamination=c, max_features=features.shape[0]).fit_predict(X)

def oc_svm(X, nu):
    return OneClassSVM(nu=nu).fit_predict(X)

Full code is shown below:

"""
Performs unsupervised outlier detection on process and network EDR data
Examples: 
python <FILEPATH> -q 'logon_type:interactive AND parent_name:cmd.exe'
python <FILEPATH> -q 'process_name:powershell*.exe childproc_count:[1 TO *]' -c 0.34
python <FILEPATH> -q 'parent_name:winword.exe netcon_count:[1 TO *]' -nu 0.3 -t 300
python <FILEPATH> -q 'parent_name:winword.exe netcon_count:[1 TO *]' --reuse -nu 0.23 -t 300
"""
import datetime
import argparse
import pickle
from tqdm import tqdm
from cbapi.response import *
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_selection import VarianceThreshold
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

def get_datetime_features(dt):
    """Convert datetime objects into datetime features"""
    return [dt.weekday(), dt.hour, dt.minute]

def save_data(data):
	"""Save data to filesystem"""
    with open("save.pkl", "wb") as f:
        pickle.dump(data, f)

def load_data():
	"""Load data from filesystem"""
    with open("save.pkl", "rb") as f:
        return pickle.load(f)

def collect_process_features(query):
	"""Collects features from processes in a Carbon Black EDR query"""
	process_data = []

	cb = CbResponseAPI()
	query = cb.select(Process).where(query)

	process_feature_names = ["process_name", "parent_name", "hostname", "host_type", "os_type", "username"]
	netconn_feature_names = ["direction", "proto", "local_ip", "local_port", "proxy_ip", "proxy_port", "remote_ip", "remote_port"]

	for process in tqdm(query):
		url = f"https://[EDR ADDRESS]/#/analyze/{process.id}/{process.current_segment}?cb.legacy_5x_mode=false"
		process_features = [getattr(process, feature) for feature in process_feature_names] + get_datetime_features(process.start)

		for event in process.all_events_segment:
			netconn_features = []
			if event.event_type != "Cb Network Connection event":
				continue
			for feature in [getattr(event, feature) for feature in netconn_feature_names] + get_datetime_features(event.timestamp):
				netconn_features.append(feature)

			process_data.append([url] + process_features + netconn_features)
			
	save_data(process_data)
	return process_data

def process(data):
    """Performs one-hot encoding on dataset and removes constant features. 
 Skips URL."""
    data = [d[1:] for d in data]
    encoded_data = OneHotEncoder().fit_transform(data)
    return VarianceThreshold(threshold=0).fit_transform(encoded_data)

def isolation_forest(X, n, c):
    """Fit isolation forest classifier to dataset and returns predictions"""
    return IsolationForest(n_estimators=n, contamination=c, max_features=features.shape[0]).fit_predict(X)

def oc_svm(X, nu):
    """Fit one class svm classifier to dataset and returns predictions"""
    return OneClassSVM(nu=nu).fit_predict(X)

def print_predictions(predictions, name):
	"""Pretty print outliers"""
    print(name)
    for i in range(len(predictions)):
        prediction = predictions[i]
        if prediction == -1:
            print(process_data[i][0])
            print(process_data[i][1:], end="\n\n")

parser = argparse.ArgumentParser()
parser.add_argument("-r", "--reuse", action="store_true", help="Reuse previous data")
parser.add_argument("-q", "--query", help="Carbon Black EDR query to run")
parser.add_argument("-a", "--algo", help="The learning algorithm to use (iso-forest, svm)")
parser.add_argument("-t", "--trees", help="Number of base estimators in ensemble", default=100)
parser.add_argument("-c", "--contamination", help="Defines treshold for decision boundary")
parser.add_argument("-n", "--nu", help="Lower bound for support vectors and upper bound for number of samples on wrong side of hyperplane", default=0.5)

args = parser.parse_args()

if args.reuse:
    process_data = load_data()
else:
    process_data = collect_process_features(args.query)

features = process(process_data)

if args.contamination != None:
    contamination = float(args.contamination)
else:
    contamination = "auto"

if args.algo == "iso-forest":
    print_predictions(isolation_forest(features, int(args.trees), contamination), "Isolation Forest")
elif args.algo == "svm":
    print_predictions(oc_svm(features, float(args.nu)), "One Class SVM")
else:
    print_predictions(isolation_forest(features, int(args.trees), contamination), "Isolation Forest")
    print_predictions(oc_svm(features, float(args.nu)), "One Class SVM")

References

https://developer.carbonblack.com/resources/query_overview.pdf https://cbapi.readthedocs.io/en/latest/ https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html https://scikit-learn.org/stable/modules/generated/sklearn.svm.OneClassSVM.html https://scikit-learn.org/stable/modules/outlier_detection.html https://scikit-learn.org/stable/modules/feature_extraction.html https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html https://towardsdatascience.com/machine-learning-with-datetime-feature-engineering-predicting-healthcare-appointment-no-shows-5e4ca3a85f96