|
|
import json |
|
|
import logging |
|
|
import tempfile |
|
|
import uuid |
|
|
from typing import Optional, Union, Dict, List, Any |
|
|
|
|
|
import pyarrow as pa |
|
|
import pyarrow.parquet as pq |
|
|
from huggingface_hub import CommitScheduler |
|
|
from huggingface_hub.hf_api import HfApi |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s:%(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def load_scheduler(): |
|
|
return ParquetScheduler( |
|
|
repo_id="hannahcyberey/Refusal-Steering-Logs", every=10, |
|
|
private=True, |
|
|
squash_history=False, |
|
|
schema={ |
|
|
"session_id": {"_type": "Value", "dtype": "string"}, |
|
|
"prompt": {"_type": "Value", "dtype": "string"}, |
|
|
"steering": {"_type": "Value", "dtype": "bool"}, |
|
|
"coeff": {"_type": "Value", "dtype": "float64"}, |
|
|
"top_p": {"_type": "Value", "dtype": "float64"}, |
|
|
"temperature": {"_type": "Value", "dtype": "float64"}, |
|
|
"output": {"_type": "Value", "dtype": "string"}, |
|
|
"upvote": {"_type": "Value", "dtype": "bool"}, |
|
|
"timestamp": {"_type": "Value", "dtype": "string"}, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
class ParquetScheduler(CommitScheduler): |
|
|
""" |
|
|
Reference: https://huggingface.co/spaces/Wauplin/space_to_dataset_saver |
|
|
Usage: |
|
|
Configure the scheduler with a repo id. Once started, you can add data to be uploaded to the Hub. |
|
|
1 `.append` call will result in 1 row in your final dataset. |
|
|
|
|
|
List of possible dtypes: |
|
|
https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value. |
|
|
|
|
|
```py |
|
|
# Start scheduler |
|
|
>>> scheduler = ParquetScheduler( |
|
|
... repo_id="my-parquet-dataset", |
|
|
... schema={ |
|
|
... "prompt": {"_type": "Value", "dtype": "string"}, |
|
|
... "negative_prompt": {"_type": "Value", "dtype": "string"}, |
|
|
... "guidance_scale": {"_type": "Value", "dtype": "int64"}, |
|
|
... "image": {"_type": "Image"}, |
|
|
... }, |
|
|
... ) |
|
|
|
|
|
# Append some data to be uploaded |
|
|
>>> scheduler.append({...}) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
*, |
|
|
repo_id: str, |
|
|
schema: Dict[str, Dict[str, str]], |
|
|
every: Union[int, float] = 5, |
|
|
path_in_repo: Optional[str] = "data", |
|
|
repo_type: Optional[str] = "dataset", |
|
|
revision: Optional[str] = None, |
|
|
private: bool = False, |
|
|
token: Optional[str] = None, |
|
|
allow_patterns: Union[List[str], str, None] = None, |
|
|
ignore_patterns: Union[List[str], str, None] = None, |
|
|
squash_history: Optional[bool] = False, |
|
|
hf_api: Optional[HfApi] = None, |
|
|
) -> None: |
|
|
super().__init__( |
|
|
repo_id=repo_id, |
|
|
folder_path="dummy", |
|
|
every=every, |
|
|
path_in_repo=path_in_repo, |
|
|
repo_type=repo_type, |
|
|
revision=revision, |
|
|
private=private, |
|
|
token=token, |
|
|
allow_patterns=allow_patterns, |
|
|
ignore_patterns=ignore_patterns, |
|
|
squash_history=squash_history, |
|
|
hf_api=hf_api, |
|
|
) |
|
|
|
|
|
self._rows: List[Dict[str, Any]] = [] |
|
|
self._schema = schema |
|
|
|
|
|
def append(self, row: Dict[str, Any]) -> None: |
|
|
"""Add a new item to be uploaded.""" |
|
|
with self.lock: |
|
|
self._rows.append(row) |
|
|
|
|
|
def push_to_hub(self): |
|
|
|
|
|
with self.lock: |
|
|
rows = self._rows |
|
|
self._rows = [] |
|
|
if not rows: |
|
|
return |
|
|
logger.info("Got %d item(s) to commit.", len(rows)) |
|
|
|
|
|
|
|
|
for row in rows: |
|
|
for feature in self._schema: |
|
|
if feature not in row: |
|
|
row[feature] = None |
|
|
|
|
|
|
|
|
table = pa.Table.from_pylist(rows) |
|
|
|
|
|
|
|
|
table = table.replace_schema_metadata( |
|
|
{"huggingface": json.dumps({"info": {"features": self._schema}})} |
|
|
) |
|
|
|
|
|
|
|
|
archive_file = tempfile.NamedTemporaryFile() |
|
|
pq.write_table(table, archive_file.name) |
|
|
|
|
|
|
|
|
self.api.upload_file( |
|
|
repo_id=self.repo_id, |
|
|
repo_type=self.repo_type, |
|
|
revision=self.revision, |
|
|
path_in_repo=f"{uuid.uuid4()}.parquet", |
|
|
path_or_fileobj=archive_file.name, |
|
|
) |
|
|
logging.info("Commit completed.") |
|
|
|
|
|
|
|
|
archive_file.close() |
|
|
|
|
|
|