Skip to main content

Integration - Google Cloud Storage

Cloud Storage is a service for storing objects in Google Cloud. By following this guide, you will enable Contextal Platform processing of all objects uploaded to Cloud Storage. Objects that trigger BLOCK or QUARANTINE actions based on your scenarios will be placed in a designated quarantine bucket. Objects that pass clean or are excluded by filters will be placed in separate clean or unscanned buckets, respectively.

These instructions are built upon the official Cloud Storage instructions for integrating ClamAV scanning. However, with Contextal Platform, you'll not only have a hardened ClamAV setup but also benefit from the platform's enhanced capabilities, giving you superior threat detection and data processing options!

Integration Steps

1. Set up your environment, using the following shell script (just update PROJECT_ID, you can also change the names of the target buckets):

export PROJECT_ID=PUT_YOUR_PROJECT_ID
# Use https://cloud.google.com/about/locations to find best suited regions for you.
export REGION=europe-central2
# Location can be single region (us-west1, europe-southwest1, ...) or multi-region (us, eu, ...).
export LOCATION=eu
export SERVICE_NAME="contextal-scanner"
export SERVICE_ACCOUNT="${SERVICE_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
# You can use different names for your buckets. Make sure they are unique.
export BUCKET_UNSCANNED="unscanned-${PROJECT_ID}"
export BUCKET_CLEAN="clean-${PROJECT_ID}"
export BUCKET_QUARANTINED="quarantined-${PROJECT_ID}"

#Initialize the gcloud environment with your project ID.
gcloud config set project "${PROJECT_ID}"

#Create buckets.
gcloud storage buckets create "gs://${BUCKET_UNSCANNED}" --location="${LOCATION}"
gcloud storage buckets create "gs://${BUCKET_CLEAN}" --location="${LOCATION}"
gcloud storage buckets create "gs://${BUCKET_QUARANTINED}" --location="${LOCATION}"

2. Set up the service account for the contextal-scanner service:

# Create account for service.
gcloud iam service-accounts create ${SERVICE_NAME}

# Grant the Object Admin role to the buckets.
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_UNSCANNED}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_CLEAN}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_QUARANTINED}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin

# Grant the Metric Writer role.
gcloud projects add-iam-policy-binding \
"${PROJECT_ID}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role=roles/monitoring.metricWriter

3. Create file service.py in the current directory with the following content:

from cloudevents.http import from_http
from google.cloud import storage
from flask import Flask, request
import contextal
import os
import tempfile
import time

BUCKET_UNSCANNED="YOUR_UNSCANNED_BUCKET_NAME"
BUCKET_CLEAN="YOUR_CLEAN_BUCKET_NAME"
BUCKET_QUARANTINED="YOUR_QUARANTINED_BUCKET_NAME"
SIZE_LIMIT=200000000

ctx_config_file="service.toml"
app = Flask(__name__)
ctx_platform = None

class ActionsConfig:
def __init__(self, actions_priority, ignored_actions):
action_list = actions_priority.split(",")
if len(action_list) == 0:
raise Exception("--actions-priority is empty")
self.priority = {}
priority = 0
for action in action_list:
self.priority[action] = {
"priority": priority,
"clean": False
}
priority = priority + 1
ignored_list = ignored_actions.split(",")
for action in ignored_list:
if action not in self.priority:
continue
self.priority[action]["clean"]=True
def get(self, action):
if action not in self.priority:
return None
return self.priority[action]

def download_file(storage_client, bucketname, filename):
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
file = tempfile.NamedTemporaryFile()
blob.download_to_file(file)
file.seek(0)
return file

def scan_actions(actions_array, actions_config) -> bool:
result = None
for element in actions_array:
actions = element["actions"]
for action in actions:
scenario=action["scenario"]
action = action["action"]
config = actions_config.get(action)
if config is None:
continue
if result is None or config["priority"] < result["priority"]:
result = {
"action": action,
"scenario": scenario,
"priority": config["priority"],
"clean": config["clean"]
}
return result is not None and not result["clean"]

def scan_file(ctx_platform: contextal.Platform, file, filename) -> bool:
work = ctx_platform.submit_work(object_stream=file, object_name=filename)
work_id = work["work_id"]
while True:
graphs = ctx_platform.get_graphs([work_id])
if graphs[work_id] is not None:
break
time.sleep(0.5)
actions_array = ctx_platform.get_actions(work_id)
actions_config = ActionsConfig("ALLOW,BLOCK,QUARANTINE", "ALLOW")
infected = scan_actions(actions_array, actions_config)
return infected

def move_file(storage_client, filename: str, clean: bool):
source_bucket = storage_client.bucket(BUCKET_UNSCANNED)
source_blob = source_bucket.blob(filename)
destination_bucket = storage_client.bucket(BUCKET_CLEAN if clean else BUCKET_QUARANTINED)
source_bucket.copy_blob(source_blob, destination_bucket, filename)
source_bucket.delete_blob(filename)

@app.route("/", methods=["POST"])
def index():
# return 200 on errors to prevent re-trying
if ctx_platform is None:
msg = "Contextal Platform is not initialized"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
event = from_http(request.headers, request.get_data())
kind = event.data["kind"]
if kind != "storage#object":
msg = f"Unsupported event kind {kind}"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
bucket = event.data["bucket"]
name = event.data["name"]
size = int(event.data["size"])
if bucket != BUCKET_UNSCANNED:
msg = f"Unexpected bucket {bucket}"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
if size > SIZE_LIMIT:
msg = f"file gs://{bucket}/{name} is too large for scanning ({size} bytes)"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
storage_client = storage.Client()
file = download_file(storage_client, bucket, name)
infected = scan_file(ctx_platform, file, f"gs://{bucket}/{name}")
move_file(storage_client, name, not infected)
return "", 200

if __name__ == "__main__":
platform_config = contextal.Config(ctx_config_file)
platform_config.load_profile(None)
ctx_platform = contextal.Platform(platform_config)
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

and run the following commands/script to get the bucket names automatically updated to the previously specified names:

sed -i 's/YOUR_UNSCANNED_BUCKET_NAME/'"$BUCKET_UNSCANNED"'/' service.py
sed -i 's/YOUR_CLEAN_BUCKET_NAME/'"$BUCKET_CLEAN"'/' service.py
sed -i 's/YOUR_QUARANTINED_BUCKET_NAME/'"$BUCKET_QUARANTINED"'/' service.py

4. Create file service.toml in the current directory with the following content:

default = "default"
[platform.default]
url = YOUR_CLOUD_URL
token = YOUR_CLOUD_TOKEN

This file can be also created using the ctx tool - doing so will automatically verify connection to the platform:

pip install contextal
ctx config create --set-default default YOUR_CLOUD_URL --token YOUR_CLOUD_TOKEN
cp ~/.config/contextal/config.toml service.toml

5. Create file start.sh in the current directory:

#!/bin/bash
. /opt/python/bin/activate
cd /app
python3 service.py

6. Create file Dockerfile in the current directory:

FROM debian
RUN apt update
RUN apt install -y python3-venv python3-pip
RUN python3 -m venv /opt/python
RUN . /opt/python/bin/activate &&\
pip install cloudevents google-cloud-storage flask contextal
RUN mkdir /app
COPY service.toml /app/
COPY service.py /app/
COPY start.sh /app/
CMD ["bash", "/app/start.sh"]

7. Create and deploy the Cloud Run service:

gcloud beta run deploy "${SERVICE_NAME}" \
--source . \
--region "${REGION}" \
--no-allow-unauthenticated \
--memory 4Gi \
--cpu 1 \
--concurrency 20 \
--min-instances 1 \
--max-instances 5 \
--no-cpu-throttling \
--cpu-boost \
--service-account="${SERVICE_ACCOUNT}"

8. Store the Service URL value from the output of the deployment command in a shell variable:

SERVICE_URL="PUT_URL_REPORTED_BY_LAST_COMMAND"

9. Create an Eventarc Cloud Storage trigger:

# Grant the Pub/Sub Publisher role to the Cloud Storage service account.
STORAGE_SERVICE_ACCOUNT=$(gcloud storage service-agent --project="${PROJECT_ID}")
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
--member "serviceAccount:${STORAGE_SERVICE_ACCOUNT}" \
--role "roles/pubsub.publisher"

# Allow the contextal-scanner service account to invoke the Cloud Run service, and act as an Eventarc event receiver.
gcloud run services add-iam-policy-binding "${SERVICE_NAME}" \
--region="${REGION}" \
--member "serviceAccount:${SERVICE_ACCOUNT}" \
--role roles/run.invoker
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
--member "serviceAccount:${SERVICE_ACCOUNT}" \
--role "roles/eventarc.eventReceiver"

# Create an Eventarc trigger.
gcloud eventarc triggers create "trigger-${BUCKET_NAME}-${SERVICE_NAME}" \
--destination-run-service="${SERVICE_NAME}" \
--destination-run-region="${REGION}" \
--location="${LOCATION}" \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=${BUCKET_UNSCANNED}" \
--service-account="${SERVICE_ACCOUNT}"

# Change the message acknowledgement deadline to two minutes in the underlying Pub/Sub subscription that's used by the Eventarc trigger.
# The default value of 10 seconds is too short for large files or high loads.
SUBSCRIPTION_NAME=$(gcloud eventarc triggers describe \
"trigger-${BUCKET_NAME}-${SERVICE_NAME}" \
--location="${LOCATION}" \
--format="get(transport.pubsub.subscription)")
gcloud pubsub subscriptions update "${SUBSCRIPTION_NAME}" --ack-deadline=120

10. You're done! The files copied to the Cloud Storage should now be automatically placed in the appropriate buckets.