# Wrapper modules
import icav2
from icav2.api import project_data_api
from icav2.model.problem import Problem
from icav2.model.project_data import ProjectData
# Helper modules
import random
import os
import requests
import string
import hashlib
import getpass# Authenticate using User credentials
username = input("ICA Username")
password = getpass.getpass("ICA Password")
tenant = input("ICA Tenant name")
url = os.environ['ICA_URL'] + '/rest/api/tokens'
r = requests.post(url, data={}, auth=(username,password),params={'tenant':tenant})
token = None
apiClient = None
if r.status_code == 200:
token = r.content
configuration = icav2.Configuration(
host = os.environ['ICA_URL'] + '/rest',
access_token = str(r.json()["token"])
)
apiClient = icav2.ApiClient(configuration, header_name="Content-Type",header_value="application/vnd.illumina.v3+json")
print("Authenticated to %s" % str(os.environ['ICA_URL']))
else:
print("Error authenticating to %s" % str(os.environ['ICA_URL']))
print("Response: %s" % str(r.status_code))
## Authenticate using ICA API TOKEN
configuration = icav2.Configuration(
host = os.environ['ICA_URL'] + '/rest'
)
configuration.api_key['ApiKeyAuth'] = getpass.getpass()
apiClient = icav2.ApiClient(configuration, header_name="Content-Type",header_value="application/vnd.illumina.v3+json")# Retrieve project ID from the Bench workspace environment
projectId = os.environ['ICA_PROJECT']# Create a Project Data API client instance
projectDataApiInstance = project_data_api.ProjectDataApi(apiClient)# List all data in a project
pageOffset = 0
pageSize = 30
try:
projectDataPagedList = projectDataApiInstance.get_project_data_list(project_id = projectId, page_size = str(pageSize), page_offset = str(pageOffset))
totalRecords = projectDataPagedList.total_item_count
while pageOffset*pageSize < totalRecords:
for projectData in projectDataPagedList.items:
print("Path: "+projectData.data.details.path + " - Type: "+projectData.data.details.data_type)
pageOffset = pageOffset + 1
except icav2.ApiException as e:
print("Exception when calling ProjectDataAPIApi->get_project_data_list: %s\n" % e)# Create data element in a project
data = icav2.model.create_data.CreateData(name="test.txt",data_type = "FILE")
try:
projectData = projectDataApiInstance.create_data_in_project(projectId, create_data=data)
fileId = projectData.data.id
except icav2.ApiException as e:
print("Exception when calling ProjectDataAPIApi->create_data_in_project: %s\n" % e)## Upload a local file to a data element in a project
# Create a local file in a Bench workspace
filename = '/tmp/'+''.join(random.choice(string.ascii_lowercase) for i in range(10))+".txt"
content = ''.join(random.choice(string.ascii_lowercase) for i in range(100))
f = open(filename, "a")
f.write(content)
f.close()
# Calculate MD5 hash (optional)
localFileHash = md5Hash = hashlib.md5((open(filename, 'rb').read())).hexdigest()
try:
# Get Upload URL
upload = projectDataApiInstance.create_upload_url_for_data(project_id = projectId, data_id = fileId)
# Upload dummy file
files = {'file': open(filename, 'r')}
data = open(filename, 'r').read()
r = requests.put(upload.url , data=data)
except icav2.ApiException as e:
print("Exception when calling ProjectDataAPIApi->create_upload_url_for_data: %s\n" % e)
# Delete local dummy file
os.remove(filename)## Download a data element from a project
try:
# Get Download URL
download = projectDataApiInstance.create_download_url_for_data(project_id=projectId, data_id=fileId)
# Download file
filename = '/tmp/'+''.join(random.choice(string.ascii_lowercase) for i in range(10))+".txt"
r = requests.get(download.url)
open(filename, 'wb').write(r.content)
# Verify md5 hash
remoteFileHash = hashlib.md5((open(filename, 'rb').read())).hexdigest()
if localFileHash != remoteFileHash:
print("Error: MD5 mismatch")
# Delete local dummy file
os.remove(filename)
except icav2.ApiException as e:
print("Exception when calling ProjectDataAPIApi->create_download_url_for_data: %s\n" % e)# Search for matching data elements in a project
try:
projectDataPagedList = projectDataApiInstance.get_project_data_list(project_id = projectId, full_text="test.txt")
for projectData in projectDataPagedList.items:
print("Path: " + projectData.data.details.path + " - Name: "+projectData.data.id + " - Type: "+projectData.data.details.data_type)
except icav2.ApiException as e:
print("Exception when calling ProjectDataAPIApi->get_project_data_list: %s\n" % e)# Delete matching data elements in a project
try:
projectDataPagedList = projectDataApiInstance.get_project_data_list(project_id = projectId, full_text="test.txt")
for projectData in projectDataPagedList.items:
print("Deleting file "+projectData.data.details.path)
projectDataApiInstance.delete_data(project_id = projectId, data_id = projectData.data.id)
except icav2.ApiException as e:
print("Exception %s\n" % e)# API modules
import icav2
from icav2.api import project_base_api
from icav2.model.problem import Problem
from icav2.model.base_connection import BaseConnection
# Helper modules
import os
import requests
import getpass
import snowflake.connector# Retrieve project ID from the Bench workspace environment
projectId = os.environ['ICA_PROJECT']# Create a Project Base API client instance
projectBaseApiInstance = project_base_api.ProjectBaseApi(apiClient)# Get a Base Access Token
try:
baseConnection = projectBaseApiInstance.create_base_connection_details(project_id = projectId)
except icav2.ApiException as e:
print("Exception when calling ProjectBaseAPIApi->create_base_connection_details: %s\n" % e)
## Create a python jdbc connection
ctx = snowflake.connector.connect(
account=os.environ["ICA_SNOWFLAKE_ACCOUNT"],
authenticator=baseConnection.authenticator,
token=baseConnection.access_token,
database=os.environ["ICA_SNOWFLAKE_DATABASE"],
role=baseConnection.role_name,
warehouse=baseConnection.warehouse_name
)
ctx.cursor().execute("USE "+os.environ["ICA_SNOWFLAKE_DATABASE"])## Create a Table
tableName = "test_table"
ctx.cursor().execute("CREATE OR REPLACE TABLE " + tableName + "(col1 integer, col2 string)")## Insert data into a table
ctx.cursor().execute(
"INSERT INTO " + tableName + "(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")## Query the table
cur = ctx.cursor()
try:
cur.execute("SELECT * FROM "+tableName)
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()# Delete the table
ctx.cursor().execute("DROP TABLE " + tableName);