Coverage for src/flag_gems/runtime/backend/_kunlunxin/ops/copy.py: 0%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2026-06-10 07:09 +0800

1import logging 

2from typing import Optional 

3 

4import torch 

5import triton 

6 

7from ..utils.codegen_config_utils import CodeGenConfig 

8from ..utils.pointwise_dynamic import pointwise_dynamic 

9 

10logger = logging.getLogger("flag_gems").getChild(__name__.lstrip(".")) 

11 

12_FALLBACK_KEYSET = torch._C.DispatchKeySet( 

13 torch._C.DispatchKey.CompositeExplicitAutograd 

14) 

15 

16config_ = CodeGenConfig( 

17 512, 

18 (65536, 65536, 65536), 

19 32, 

20 True, 

21 prefer_1d_tile=True, 

22 is_scatter_slice=True, 

23) 

24 

25 

26# @pointwise_dynamic(is_tensor=(True,), promotion_methods=[(0, "DEFAULT")]) 

27# @triton.jit 

28# def copy(src): 

29# return src 

30 

31 

32@pointwise_dynamic( 

33 is_tensor=(True,), promotion_methods=[(0, "DEFAULT")], config=config_ 

34) 

35@triton.jit 

36def copy_slice(src): 

37 return src 

38 

39 

40@pointwise_dynamic(is_tensor=[True], promotion_methods=[(0, "DEFAULT")]) 

41@triton.jit 

42def _copy_kernel(src): 

43 return src 

44 

45 

46def _can_use_triton(dst: torch.Tensor, src: torch.Tensor) -> bool: 

47 if dst.layout != torch.strided or src.layout != torch.strided: 

48 return False 

49 if dst.device != src.device: 

50 return False 

51 if dst.is_quantized or src.is_quantized: 

52 return False 

53 if src.is_complex() or dst.is_complex(): 

54 # Triton on kunlunxin does not support complex dtypes; fall back to PyTorch. 

55 return False 

56 if not src.is_contiguous(): 

57 return False 

58 return True 

59 

60 

61def _expand_like(src: torch.Tensor, target_shape: torch.Size) -> torch.Tensor: 

62 if src.shape == target_shape: 

63 return src 

64 return src.expand(target_shape) 

65 

66 

67def copy( 

68 template: torch.Tensor, src: torch.Tensor, *, non_blocking: Optional[bool] = False 

69): 

70 logger.debug("GEMS_KUNLUNXIN COPY") 

71 out = torch.empty_strided( 

72 template.size(), template.stride(), dtype=template.dtype, device=template.device 

73 ) 

74 copy_(out, src, non_blocking=bool(non_blocking)) 

75 return out 

76 

77 

78def copy_(dst: torch.Tensor, src: torch.Tensor, non_blocking: bool = False): 

79 if not isinstance(src, torch.Tensor): 

80 raise TypeError("src must be a Tensor") 

81 

82 # this is the same as PyTorch's check 

83 if dst._is_zerotensor(): 

84 raise RuntimeError("ZeroTensors are immutable. Call clone() before copy_.") 

85 if src._is_zerotensor(): 

86 return dst.zero_() 

87 

88 if torch._C._is_alias_of(dst, src): 

89 # Align with PyTorch: if metadata fully matches, this is a no-op. 

90 if ( 

91 dst.storage_offset() == src.storage_offset() 

92 and dst.stride() == src.stride() 

93 and dst.size() == src.size() 

94 and dst.dtype == src.dtype 

95 and dst.device == src.device 

96 and dst.is_conj() == src.is_conj() 

97 and dst.is_neg() == src.is_neg() 

98 ): 

99 return dst 

100 # Otherwise defer to PyTorch for well-defined semantics on overlapping writes. 

101 return torch.ops.aten.copy_.default.redispatch( 

102 _FALLBACK_KEYSET, dst, src, non_blocking 

103 ) 

104 

105 if not _can_use_triton(dst, src): 

106 return torch.ops.aten.copy_.default.redispatch( 

107 _FALLBACK_KEYSET, dst, src, non_blocking 

108 ) 

109 

110 if dst.numel() == 0: 

111 # Respect PyTorch behaviour: empty tensors should still validate broadcast. 

112 return torch.ops.aten.copy_.default.redispatch( 

113 _FALLBACK_KEYSET, dst, src, non_blocking 

114 ) 

115 

116 logger.debug("GEMS_KUNLUNXIN COPY_") 

117 

118 try: 

119 broadcast_shape = torch.broadcast_shapes(dst.shape, src.shape) 

120 except RuntimeError as exc: 

121 raise RuntimeError(str(exc)) from exc 

122 

123 if torch.Size(broadcast_shape) != dst.shape: 

124 raise RuntimeError( 

125 f"The broadcast shape {broadcast_shape} does not match destination shape {tuple(dst.shape)}" 

126 ) 

127 

128 expanded_src = _expand_like(src, dst.shape) 

129 

130 overload = _copy_kernel.instantiate(expanded_src.ndim) 

131 overload(expanded_src, out0=dst) 

132 return dst