from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
import torch
import os
import signal
import random
import numpy as np
import time
from collections import Counter
import warnings
from transformers import logging
warnings.filterwarnings("ignore", message="Unrecognized keys in `rope_parameters`")
logging.set_verbosity_error()
cpu_count = os.cpu_count()
print(f"Number of CPU cores in the system: {cpu_count}")
half_cpu_count = cpu_count // 2
os.environ["MKL_NUM_THREADS"] = str(half_cpu_count)
os.environ["OMP_NUM_THREADS"] = str(half_cpu_count)
torch.set_num_threads(half_cpu_count)
print(f"PyTorch threads: {torch.get_num_threads()}")
print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}")
print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}")
torch.set_default_dtype(torch.bfloat16)
# Load the model and tokenizer
MODEL_ID = "deepseek-ai/DeepSeek-V4-Flash-BF16"
print(f"Load Model {MODEL_ID} ... ")
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16,
device_map="cpu",
low_cpu_mem_usage=True,
trust_remote_code=True,
offload_folder="./offload",
)
model = model.to(torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
#if tokenizer.chat_template is None:
# tokenizer.chat_template = """<|begin▁of▁sentence|>You are a helpful assistant.{% for message in messages %}{% if message['role'] == 'user' %}<|User|>{{ message['content'] }}<|Assistant|>{% elif message['role'] == 'assistant' %}{{ message['content'] }}<|end▁of▁sentence|>{% endif %}{% endfor %}"""
class CustomTextStreamer(TextStreamer):
def __init__(self, tokenizer, skip_prompt=True, skip_special_tokens=True):
super().__init__(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)
self.generated_text = ""
self.stop_flag = False
self.init_time = time.time() # Record initialization time
self.end_time = None # To store end time
self.first_token_time = None # To store first token generation time
self.think_tokens_count = 0 # To track total think tokens
self.token_count = 0 # To track total tokens
def on_finalized_text(self, text: str, stream_end: bool = False):
if self.first_token_time is None and text.strip(): # Set first token time on first non-empty text
self.first_token_time = time.time()
self.generated_text += text
# Count tokens in the generated text
tokens = self.tokenizer.encode(text, add_special_tokens=False)
self.token_count += len(tokens)
if self.think_tokens_count == 0 and "</think>" in self.generated_text:
self.think_tokens_count = self.token_count
print(text, end="", flush=True)
if stream_end:
self.end_time = time.time() # Record end time when streaming ends
if self.stop_flag:
raise StopIteration
def stop_generation(self):
self.stop_flag = True
self.end_time = time.time() # Record end time when generation is stopped
def get_metrics(self):
"""Returns initialization time, first token time, first token latency, end time, total time, total tokens, and tokens per second."""
if self.end_time is None:
self.end_time = time.time() # Set end time if not already set
total_time = self.end_time - self.init_time # Total time from init to end
tokens_per_second = self.token_count / total_time if total_time > 0 else 0
first_token_latency = (self.first_token_time - self.init_time) if self.first_token_time is not None else None
metrics = {
"init_time": self.init_time,
"first_token_time": self.first_token_time,
"first_token_latency": first_token_latency,
"end_time": self.end_time,
"total_time": total_time, # Total time in seconds
"think_tokens_count": self.think_tokens_count,
"total_tokens": self.token_count,
"tokens_per_second": tokens_per_second
}
return metrics
def generate_stream(model, tokenizer, messages, thinking_mode, skip_prompt, skip_special_tokens, max_new_tokens):
if thinking_mode:
formatted_prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
thinking_mode="thinking",
add_generation_prompt=True,
)
else:
formatted_prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
thinking_mode="chat",
add_generation_prompt=True,
)
print(f"formatted_prompt={formatted_prompt}\n")
toks = tokenizer(
[formatted_prompt],
return_tensors="pt",
return_token_type_ids=False,
).to(model.device)
streamer = CustomTextStreamer(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)
def signal_handler(sig, frame):
streamer.stop_generation()
print("\n[Generation stopped by user with Ctrl+C]")
signal.signal(signal.SIGINT, signal_handler)
print("Response: ", end="", flush=True)
try:
generated_ids = model.generate(
**toks,
max_new_tokens=max_new_tokens,
pad_token_id=tokenizer.eos_token_id,
streamer=streamer,
)
del generated_ids
except StopIteration:
print("\n[Stopped by user]")
del toks
torch.cuda.empty_cache()
signal.signal(signal.SIGINT, signal.SIG_DFL)
return streamer.generated_text, streamer.stop_flag, streamer.get_metrics()
init_messages = [{"role": "system", "content": "You are a helpful assistant."}]
messages = init_messages.copy()
skip_prompt=False
skip_special_tokens=False
thinking_mode=True
while True:
print(f"skip_prompt: {skip_prompt}")
print(f"skip_special_tokens: {skip_special_tokens}")
print(f"thinking_mode: {thinking_mode}")
user_input = input("User: ").strip()
if user_input.lower() == "/exit":
print("Exiting chat.")
break
if user_input.lower() == "/clear":
messages = init_messages.copy()
print("Chat history cleared. Starting a new conversation.")
continue
if user_input.lower() == "/skip_prompt":
skip_prompt = not skip_prompt
continue
if user_input.lower() == "/skip_special_tokens":
skip_special_tokens = not skip_special_tokens
continue
if user_input.lower() == "/thinking_mode":
thinking_mode = not thinking_mode
continue
if not user_input:
print("Input cannot be empty. Please enter something.")
continue
messages.append({"role": "user", "content": user_input})
activated_experts = []
response, stop_flag, metrics = generate_stream(model, tokenizer, messages, thinking_mode, skip_prompt, skip_special_tokens, 40960)
print("\n\nMetrics:")
for key, value in metrics.items():
print(f" {key}: {value}")
print("", flush=True)
if stop_flag:
continue
messages.append({"role": "assistant", "content": response})