Integration - Google Cloud Storage
Cloud Storage is a service for storing objects in Google Cloud. By following this guide, you will enable Contextal Platform processing of all objects uploaded to Cloud Storage. Objects that trigger BLOCK
or QUARANTINE
actions based on your scenarios will be placed in a designated quarantine bucket. Objects that pass clean or are excluded by filters will be placed in separate clean or unscanned buckets, respectively.
These instructions are built upon the official Cloud Storage instructions for integrating ClamAV scanning. However, with Contextal Platform, you'll not only have a hardened ClamAV setup but also benefit from the platform's enhanced capabilities, giving you superior threat detection and data processing options!
Integration Steps
1. Set up your environment, using the following shell script (just update PROJECT_ID
, you can also change the names of the target buckets):
export PROJECT_ID=PUT_YOUR_PROJECT_ID
# Use https://cloud.google.com/about/locations to find best suited regions for you.
export REGION=europe-central2
# Location can be single region (us-west1, europe-southwest1, ...) or multi-region (us, eu, ...).
export LOCATION=eu
export SERVICE_NAME="contextal-scanner"
export SERVICE_ACCOUNT="${SERVICE_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
# You can use different names for your buckets. Make sure they are unique.
export BUCKET_UNSCANNED="unscanned-${PROJECT_ID}"
export BUCKET_CLEAN="clean-${PROJECT_ID}"
export BUCKET_QUARANTINED="quarantined-${PROJECT_ID}"
#Initialize the gcloud environment with your project ID.
gcloud config set project "${PROJECT_ID}"
#Create buckets.
gcloud storage buckets create "gs://${BUCKET_UNSCANNED}" --location="${LOCATION}"
gcloud storage buckets create "gs://${BUCKET_CLEAN}" --location="${LOCATION}"
gcloud storage buckets create "gs://${BUCKET_QUARANTINED}" --location="${LOCATION}"
2. Set up the service account for the contextal-scanner
service:
# Create account for service.
gcloud iam service-accounts create ${SERVICE_NAME}
# Grant the Object Admin role to the buckets.
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_UNSCANNED}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_CLEAN}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin
gcloud storage buckets add-iam-policy-binding "gs://${BUCKET_QUARANTINED}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" --role=roles/storage.objectAdmin
# Grant the Metric Writer role.
gcloud projects add-iam-policy-binding \
"${PROJECT_ID}" \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role=roles/monitoring.metricWriter
3. Create file service.py
in the current directory with the following content:
from cloudevents.http import from_http
from google.cloud import storage
from flask import Flask, request
import contextal
import os
import tempfile
import time
BUCKET_UNSCANNED="YOUR_UNSCANNED_BUCKET_NAME"
BUCKET_CLEAN="YOUR_CLEAN_BUCKET_NAME"
BUCKET_QUARANTINED="YOUR_QUARANTINED_BUCKET_NAME"
SIZE_LIMIT=200000000
ctx_config_file="service.toml"
app = Flask(__name__)
ctx_platform = None
class ActionsConfig:
def __init__(self, actions_priority, ignored_actions):
action_list = actions_priority.split(",")
if len(action_list) == 0:
raise Exception("--actions-priority is empty")
self.priority = {}
priority = 0
for action in action_list:
self.priority[action] = {
"priority": priority,
"clean": False
}
priority = priority + 1
ignored_list = ignored_actions.split(",")
for action in ignored_list:
if action not in self.priority:
continue
self.priority[action]["clean"]=True
def get(self, action):
if action not in self.priority:
return None
return self.priority[action]
def download_file(storage_client, bucketname, filename):
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
file = tempfile.NamedTemporaryFile()
blob.download_to_file(file)
file.seek(0)
return file
def scan_actions(actions_array, actions_config) -> bool:
result = None
for element in actions_array:
actions = element["actions"]
for action in actions:
scenario=action["scenario"]
action = action["action"]
config = actions_config.get(action)
if config is None:
continue
if result is None or config["priority"] < result["priority"]:
result = {
"action": action,
"scenario": scenario,
"priority": config["priority"],
"clean": config["clean"]
}
return result is not None and not result["clean"]
def scan_file(ctx_platform: contextal.Platform, file, filename) -> bool:
work = ctx_platform.submit_work(object_stream=file, object_name=filename)
work_id = work["work_id"]
while True:
graphs = ctx_platform.get_graphs([work_id])
if graphs[work_id] is not None:
break
time.sleep(0.5)
actions_array = ctx_platform.get_actions(work_id)
actions_config = ActionsConfig("ALLOW,BLOCK,QUARANTINE", "ALLOW")
infected = scan_actions(actions_array, actions_config)
return infected
def move_file(storage_client, filename: str, clean: bool):
source_bucket = storage_client.bucket(BUCKET_UNSCANNED)
source_blob = source_bucket.blob(filename)
destination_bucket = storage_client.bucket(BUCKET_CLEAN if clean else BUCKET_QUARANTINED)
source_bucket.copy_blob(source_blob, destination_bucket, filename)
source_bucket.delete_blob(filename)
@app.route("/", methods=["POST"])
def index():
# return 200 on errors to prevent re-trying
if ctx_platform is None:
msg = "Contextal Platform is not initialized"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
event = from_http(request.headers, request.get_data())
kind = event.data["kind"]
if kind != "storage#object":
msg = f"Unsupported event kind {kind}"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
bucket = event.data["bucket"]
name = event.data["name"]
size = int(event.data["size"])
if bucket != BUCKET_UNSCANNED:
msg = f"Unexpected bucket {bucket}"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
if size > SIZE_LIMIT:
msg = f"file gs://{bucket}/{name} is too large for scanning ({size} bytes)"
print(f"error: {msg}")
return f"Internal Error: {msg}", 200
storage_client = storage.Client()
file = download_file(storage_client, bucket, name)
infected = scan_file(ctx_platform, file, f"gs://{bucket}/{name}")
move_file(storage_client, name, not infected)
return "", 200
if __name__ == "__main__":
platform_config = contextal.Config(ctx_config_file)
platform_config.load_profile(None)
ctx_platform = contextal.Platform(platform_config)
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
and run the following commands/script to get the bucket names automatically updated to the previously specified names:
sed -i 's/YOUR_UNSCANNED_BUCKET_NAME/'"$BUCKET_UNSCANNED"'/' service.py
sed -i 's/YOUR_CLEAN_BUCKET_NAME/'"$BUCKET_CLEAN"'/' service.py
sed -i 's/YOUR_QUARANTINED_BUCKET_NAME/'"$BUCKET_QUARANTINED"'/' service.py
4. Create file service.toml
in the current directory with the following content:
default = "default"
[platform.default]
url = YOUR_CLOUD_URL
token = YOUR_CLOUD_TOKEN
This file can be also created using the ctx
tool - doing so will automatically verify connection to the platform:
pip install contextal
ctx config create --set-default default YOUR_CLOUD_URL --token YOUR_CLOUD_TOKEN
cp ~/.config/contextal/config.toml service.toml
5. Create file start.sh
in the current directory:
#!/bin/bash
. /opt/python/bin/activate
cd /app
python3 service.py
6. Create file Dockerfile
in the current directory:
FROM debian
RUN apt update
RUN apt install -y python3-venv python3-pip
RUN python3 -m venv /opt/python
RUN . /opt/python/bin/activate &&\
pip install cloudevents google-cloud-storage flask contextal
RUN mkdir /app
COPY service.toml /app/
COPY service.py /app/
COPY start.sh /app/
CMD ["bash", "/app/start.sh"]
7. Create and deploy the Cloud Run service:
gcloud beta run deploy "${SERVICE_NAME}" \
--source . \
--region "${REGION}" \
--no-allow-unauthenticated \
--memory 4Gi \
--cpu 1 \
--concurrency 20 \
--min-instances 1 \
--max-instances 5 \
--no-cpu-throttling \
--cpu-boost \
--service-account="${SERVICE_ACCOUNT}"
8. Store the Service URL value from the output of the deployment command in a shell variable:
SERVICE_URL="PUT_URL_REPORTED_BY_LAST_COMMAND"
9. Create an Eventarc Cloud Storage trigger:
# Grant the Pub/Sub Publisher role to the Cloud Storage service account.
STORAGE_SERVICE_ACCOUNT=$(gcloud storage service-agent --project="${PROJECT_ID}")
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
--member "serviceAccount:${STORAGE_SERVICE_ACCOUNT}" \
--role "roles/pubsub.publisher"
# Allow the contextal-scanner service account to invoke the Cloud Run service, and act as an Eventarc event receiver.
gcloud run services add-iam-policy-binding "${SERVICE_NAME}" \
--region="${REGION}" \
--member "serviceAccount:${SERVICE_ACCOUNT}" \
--role roles/run.invoker
gcloud projects add-iam-policy-binding "${PROJECT_ID}" \
--member "serviceAccount:${SERVICE_ACCOUNT}" \
--role "roles/eventarc.eventReceiver"
# Create an Eventarc trigger.
gcloud eventarc triggers create "trigger-${BUCKET_NAME}-${SERVICE_NAME}" \
--destination-run-service="${SERVICE_NAME}" \
--destination-run-region="${REGION}" \
--location="${LOCATION}" \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=${BUCKET_UNSCANNED}" \
--service-account="${SERVICE_ACCOUNT}"
# Change the message acknowledgement deadline to two minutes in the underlying Pub/Sub subscription that's used by the Eventarc trigger.
# The default value of 10 seconds is too short for large files or high loads.
SUBSCRIPTION_NAME=$(gcloud eventarc triggers describe \
"trigger-${BUCKET_NAME}-${SERVICE_NAME}" \
--location="${LOCATION}" \
--format="get(transport.pubsub.subscription)")
gcloud pubsub subscriptions update "${SUBSCRIPTION_NAME}" --ack-deadline=120
10. You're done! The files copied to the Cloud Storage should now be automatically placed in the appropriate buckets.